You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/09 12:17:27 UTC

[drill] 01/04: DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 139f7156bcb2f6fef5b36f116c9c1b6095fc4b9c
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Thu Jul 5 20:48:37 2018 +0000

    DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level
    
    closes #1365
---
 ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java |   6 +-
 .../ConvertHiveParquetScanToDrillParquetScan.java  |  21 ++--
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  17 +++-
 .../store/hive/HiveDrillNativeParquetScan.java     |  20 +++-
 .../org/apache/drill/exec/store/hive/HiveScan.java | 105 ++++++++++----------
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  57 ++++++++---
 .../exec/store/hive/HiveStoragePluginConfig.java   |  21 ++--
 .../apache/drill/exec/store/hive/HiveSubScan.java  |  22 +++--
 .../drill/exec/store/hive/HiveUtilities.java       | 108 ++++++++++++++++-----
 .../main/resources/bootstrap-storage-plugins.json  |   2 +-
 .../exec/TestHiveDrillNativeParquetReader.java     |  17 ++++
 .../apache/drill/exec/hive/TestHiveStorage.java    |  21 ++++
 .../exec/hive/TestInfoSchemaOnHiveStorage.java     |   2 +
 .../exec/store/hive/HiveTestDataGenerator.java     |  80 ++++++++-------
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../org/apache/drill/exec/opt/BasicOptimizer.java  |  53 +++++-----
 .../drill/exec/planner/logical/DrillTable.java     |  11 ++-
 .../drill/exec/planner/sql/SqlConverter.java       |  34 +++----
 .../exec/server/options/SystemOptionManager.java   |  43 ++++----
 .../drill/exec/store/AbstractStoragePlugin.java    |  12 +++
 .../org/apache/drill/exec/store/StoragePlugin.java |  45 ++++++---
 .../java-exec/src/main/resources/drill-module.conf |   4 +
 22 files changed, 457 insertions(+), 247 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index 50fee9c..3bc33b3 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -88,7 +88,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
       HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
       HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
       HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
-          hiveScan.getStoragePlugin().getHiveConf());
+          hiveScan.getHiveConf());
       if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
         // table is empty, use original scan
         return;
@@ -134,7 +134,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
     List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
         .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
         .collect(Collectors.toList());
-    JsonTableGroupScan nariveMapRDBScan =
+    JsonTableGroupScan nativeMapRDBScan =
         new JsonTableGroupScan(
             hiveScan.getUserName(),
             hiveScan.getStoragePlugin(),
@@ -155,7 +155,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
         hiveScanRel.getCluster(),
         hiveScanRel.getTraitSet(),
         hiveScanRel.getTable(),
-        nariveMapRDBScan,
+        nativeMapRDBScan,
         nativeScanRowType,
         hiveScanCols);
   }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 2a2f4fb..ea71157 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.planner.sql.logical;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -43,6 +41,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -88,7 +88,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
       final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
       final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
 
-      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
+      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf());
       final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
 
       if (logicalInputSplits.isEmpty()) {
@@ -123,7 +123,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
    * Create mapping of Hive partition column to directory column mapping.
    */
   private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
-    final Map<String, String> partitionColMapping = Maps.newHashMap();
+    final Map<String, String> partitionColMapping = new HashMap<>();
     int i = 0;
     for (FieldSchema col : hiveTable.getPartitionKeys()) {
       partitionColMapping.put(col.getName(), partitionColumnLabel+i);
@@ -143,8 +143,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
 
-    final List<String> nativeScanColNames = Lists.newArrayList();
-    final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+    final List<String> nativeScanColNames = new ArrayList<>();
+    final List<RelDataType> nativeScanColTypes = new ArrayList<>();
     for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
       final String dirColName = partitionColMapping.get(field.getName());
       if (dirColName != null) { // partition column
@@ -161,8 +161,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     // Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
     // columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
     // unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
-    final List<SchemaPath> nativeScanCols = Lists.newArrayList();
-    for(SchemaPath colName : hiveScanRel.getColumns()) {
+    final List<SchemaPath> nativeScanCols = new ArrayList<>();
+    for (SchemaPath colName : hiveScanRel.getColumns()) {
       final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath());
       if (partitionCol != null) {
         nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
@@ -177,7 +177,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
             hiveScan.getUserName(),
             nativeScanCols,
             hiveScan.getStoragePlugin(),
-            logicalInputSplits);
+            logicalInputSplits,
+            hiveScan.getConfProperties());
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
@@ -194,7 +195,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
   private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
       final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
 
-    final List<RexNode> rexNodes = Lists.newArrayList();
+    final List<RexNode> rexNodes = new ArrayList<>();
     final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
     final RelDataType hiveScanRowType = hiveScanRel.getRowType();
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index e227015..d334ec8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 @JsonTypeName("hive-drill-native-parquet-row-group-scan")
 public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan {
@@ -45,6 +46,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -53,12 +55,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
                                             @JsonProperty("columns") List<SchemaPath> columns,
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                            @JsonProperty("confProperties") Map<String, String> confProperties,
                                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
         columns,
         hivePartitionHolder,
+        confProperties,
         filter);
   }
 
@@ -67,11 +71,13 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             List<RowGroupReadEntry> rowGroupReadEntries,
                                             List<SchemaPath> columns,
                                             HivePartitionHolder hivePartitionHolder,
+                                            Map<String, String> confProperties,
                                             LogicalExpression filter) {
     super(userName, rowGroupReadEntries, columns, filter);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
   }
 
   @JsonProperty
@@ -84,6 +90,11 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getHiveStoragePlugin() {
     return hiveStoragePlugin;
@@ -92,7 +103,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
   }
 
   @Override
@@ -102,7 +113,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
   }
 
   @Override
@@ -114,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
     Path path = new Path(rowGroupReadEntry.getPath()).getParent();
     return new ProjectionPusher().pushProjectionsAndFilters(
-        new JobConf(hiveStoragePlugin.getHiveConf()),
+        new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
         path.getParent());
   }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 03a80d3..a973fa1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -62,7 +62,8 @@ import java.util.Map;
 public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
 
   private final HiveStoragePlugin hiveStoragePlugin;
-  private HivePartitionHolder hivePartitionHolder;
+  private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -71,10 +72,12 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     @JsonProperty("columns") List<SchemaPath> columns,
                                     @JsonProperty("entries") List<ReadEntryWithPath> entries,
                                     @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                    @JsonProperty("confProperties") Map<String, String> confProperties,
                                     @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
     this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
 
     init();
   }
@@ -82,19 +85,22 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
-                                    List<LogicalInputSplit> logicalInputSplits) throws IOException {
-    this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
+                                    List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties) throws IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties,
                                     LogicalExpression filter) throws IOException {
     super(userName, columns, new ArrayList<>(), filter);
 
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.hivePartitionHolder = new HivePartitionHolder();
+    this.confProperties = confProperties;
 
     for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
       Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
@@ -122,6 +128,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
     super(that);
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.hivePartitionHolder = that.hivePartitionHolder;
+    this.confProperties = that.confProperties;
   }
 
   @JsonProperty
@@ -134,6 +141,11 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
     List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
@@ -142,7 +154,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       List<String> values = hivePartitionHolder.get(readEntry.getPath());
       subPartitionHolder.add(readEntry.getPath(), values);
     }
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 11d47f3..d631740 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -50,8 +50,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
 
@@ -64,6 +62,7 @@ public class HiveScan extends AbstractGroupScan {
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveReadEntry hiveReadEntry;
   private final HiveMetadataProvider metadataProvider;
+  private final Map<String, String> confProperties;
 
   private List<List<LogicalInputSplit>> mappings;
   private List<LogicalInputSplit> inputSplits;
@@ -75,22 +74,24 @@ public class HiveScan extends AbstractGroupScan {
                   @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry,
                   @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig,
                   @JsonProperty("columns") final List<SchemaPath> columns,
+                  @JsonProperty("confProperties") final Map<String, String> confProperties,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this(userName,
         hiveReadEntry,
         (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
         columns,
-        null);
+        null, confProperties);
   }
 
   public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin,
-      final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
+                  final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
     if (metadataProvider == null) {
-      this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf());
+      this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf());
     } else {
       this.metadataProvider = metadataProvider;
     }
@@ -102,10 +103,11 @@ public class HiveScan extends AbstractGroupScan {
     this.hiveReadEntry = that.hiveReadEntry;
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.metadataProvider = that.metadataProvider;
+    this.confProperties = that.confProperties;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
+    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties);
   }
 
   @JsonProperty
@@ -123,29 +125,37 @@ public class HiveScan extends AbstractGroupScan {
     return columns;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getStoragePlugin() {
     return hiveStoragePlugin;
   }
 
-  protected HiveMetadataProvider getMetadataProvider() {
-    return metadataProvider;
+  @JsonIgnore
+  public HiveConf getHiveConf() {
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
   }
 
-  private List<LogicalInputSplit> getInputSplits() {
-    if (inputSplits == null) {
-      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
-    }
-
-    return inputSplits;
+  @JsonIgnore
+  public boolean isNativeReader() {
+    return false;
   }
 
+  @Override
+  public boolean supportsPartitionFilterPushdown() {
+    List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+    return !(partitionKeys == null || partitionKeys.size() == 0);
+  }
 
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
     mappings = new ArrayList<>();
     for (int i = 0; i < endpoints.size(); i++) {
-      mappings.add(new ArrayList<LogicalInputSplit>());
+      mappings.add(new ArrayList<>());
     }
     final int count = endpoints.size();
     final List<LogicalInputSplit> inputSplits = getInputSplits();
@@ -158,9 +168,9 @@ public class HiveScan extends AbstractGroupScan {
   public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
       final List<LogicalInputSplit> splits = mappings.get(minorFragmentId);
-      List<HivePartitionWrapper> parts = Lists.newArrayList();
-      final List<List<String>> encodedInputSplits = Lists.newArrayList();
-      final List<String> splitTypes = Lists.newArrayList();
+      List<HivePartitionWrapper> parts = new ArrayList<>();
+      final List<List<String>> encodedInputSplits = new ArrayList<>();
+      final List<String> splitTypes = new ArrayList<>();
       for (final LogicalInputSplit split : splits) {
         final Partition splitPartition = split.getPartition();
         if (splitPartition != null) {
@@ -176,7 +186,7 @@ public class HiveScan extends AbstractGroupScan {
       }
 
       final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
-      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -192,7 +202,7 @@ public class HiveScan extends AbstractGroupScan {
     final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) {
       endpointMap.put(endpoint.getAddress(), endpoint);
-      logger.debug("endpoing address: {}", endpoint.getAddress());
+      logger.debug("Endpoint address: {}", endpoint.getAddress());
     }
     final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
@@ -204,7 +214,7 @@ public class HiveScan extends AbstractGroupScan {
       for (final LogicalInputSplit split : inputSplits) {
         final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
         for (final String loc : split.getLocations()) {
-          logger.debug("split location: {}", loc);
+          logger.debug("Split location: {}", loc);
           final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
             if (affinityMap.containsKey(endpoint)) {
@@ -218,13 +228,8 @@ public class HiveScan extends AbstractGroupScan {
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    for (final DrillbitEndpoint ep : affinityMap.keySet()) {
-      Preconditions.checkNotNull(ep);
-    }
-    for (final EndpointAffinity a : affinityMap.values()) {
-      Preconditions.checkNotNull(a.getEndpoint());
-    }
-    return Lists.newArrayList(affinityMap.values());
+
+    return new ArrayList<>(affinityMap.values());
   }
 
   @Override
@@ -243,21 +248,8 @@ public class HiveScan extends AbstractGroupScan {
     }
   }
 
-  protected int getSerDeOverheadFactor() {
-    final int projectedColumnCount;
-    if (Utilities.isStarQuery(columns)) {
-      Table hiveTable = hiveReadEntry.getTable();
-      projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
-    } else {
-      // In cost estimation, # of project columns should be >= 1, even for skipAll query.
-      projectedColumnCount = Math.max(columns.size(), 1);
-    }
-
-    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
-  }
-
   @Override
-  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) {
     return new HiveScan(this);
   }
 
@@ -275,6 +267,7 @@ public class HiveScan extends AbstractGroupScan {
         + ", numPartitions=" + numPartitions
         + ", partitions= " + partitions
         + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
+        + ", confProperties=" + confProperties
         + "]";
   }
 
@@ -290,22 +283,24 @@ public class HiveScan extends AbstractGroupScan {
     return true;
   }
 
-  // Return true if the current table is partitioned false otherwise
-  public boolean supportsPartitionFilterPushdown() {
-    final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
-    if (partitionKeys == null || partitionKeys.size() == 0) {
-      return false;
+  private List<LogicalInputSplit> getInputSplits() {
+    if (inputSplits == null) {
+      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
-    return true;
-  }
 
-  @JsonIgnore
-  public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return inputSplits;
   }
 
-  @JsonIgnore
-  public boolean isNativeReader() {
-    return false;
+  private int getSerDeOverheadFactor() {
+    final int projectedColumnCount;
+    if (Utilities.isStarQuery(columns)) {
+      Table hiveTable = hiveReadEntry.getTable();
+      projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
+    } else {
+      // In cost estimation, # of project columns should be >= 1, even for skipAll query.
+      projectedColumnCount = Math.max(columns.size(), 1);
+    }
+
+    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
   }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index ced8b01..adf1348 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -18,27 +18,34 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.calcite.schema.SchemaPlus;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -53,17 +60,16 @@ import org.apache.thrift.transport.TTransportException;
 
 public class HiveStoragePlugin extends AbstractStoragePlugin {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
 
   private final HiveStoragePluginConfig config;
   private HiveSchemaFactory schemaFactory;
   private final HiveConf hiveConf;
 
-  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name)
-      throws ExecutionSetupException {
+  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
     super(context, name);
     this.config = config;
-    this.hiveConf = createHiveConf(config.getHiveConfigOverride());
+    this.hiveConf = HiveUtilities.generateHiveConf(config.getConfigProps());
     this.schemaFactory = new HiveSchemaFactory(this, name, hiveConf);
   }
 
@@ -76,7 +82,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+  }
+
+  @Override
   public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
     HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
     try {
       if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -84,7 +100,26 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
             "Querying views created in Hive from Drill is not supported in current version.");
       }
 
-      return new HiveScan(userName, hiveReadEntry, this, columns, null);
+      Map<String, String> confProperties = new HashMap<>();
+      if (options != null) {
+        String value = StringEscapeUtils.unescapeJava(options.getString(ExecConstants.HIVE_CONF_PROPERTIES));
+        logger.trace("[{}] is set to {}.", ExecConstants.HIVE_CONF_PROPERTIES, value);
+        try {
+          Properties properties = new Properties();
+          properties.load(new StringReader(value));
+          confProperties =
+            properties.stringPropertyNames().stream()
+              .collect(
+                Collectors.toMap(
+                  Function.identity(),
+                  properties::getProperty,
+                  (o, n) -> n));
+          } catch (IOException e) {
+            logger.warn("Unable to parse Hive conf properties {}, ignoring them.", value);
+        }
+      }
+
+      return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
@@ -181,14 +216,4 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
     return ruleBuilder.build();
   }
 
-  private static HiveConf createHiveConf(final Map<String, String> hiveConfigOverride) {
-    final HiveConf hiveConf = new HiveConf();
-    for(Entry<String, String> config : hiveConfigOverride.entrySet()) {
-      final String key = config.getKey();
-      final String value = config.getValue();
-      hiveConf.set(key, value);
-      logger.trace("HiveConfig Override {}={}", key, value);
-    }
-    return hiveConf;
-  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index b6f15c8..d812468 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -19,28 +19,31 @@ package org.apache.drill.exec.store.hive;
 
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonAlias;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(HiveStoragePluginConfig.NAME)
 public class HiveStoragePluginConfig extends StoragePluginConfigBase {
-  @JsonProperty
-  public Map<String, String> configProps;
 
   public static final String NAME = "hive";
 
-  @JsonIgnore
-  public Map<String, String> getHiveConfigOverride() {
-    return configProps;
-  }
+  private final Map<String, String> configProps;
 
   @JsonCreator
-  public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
-    this.configProps = props;
+  public HiveStoragePluginConfig(@JsonProperty("configProps")
+                                 // previously two names were allowed due to incorrectly written ser / der logic
+                                 // allowing to use both during deserialization for backward compatibility
+                                 @JsonAlias("config") Map<String, String> configProps) {
+    this.configProps = configProps;
+  }
+
+  @JsonProperty
+  public Map<String, String> getConfigProps() {
+    return configProps;
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 8ca8647..0acec2d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.google.common.collect.ImmutableSet;
@@ -55,6 +56,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   private final HiveTableWithColumnCache table;
   private final List<HivePartition> partitions;
   private final List<SchemaPath> columns;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@@ -63,22 +65,24 @@ public class HiveSubScan extends AbstractBase implements SubScan {
                      @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
-                     @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig)
+                     @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
+                     @JsonProperty("confProperties") Map<String, String> confProperties)
       throws IOException, ExecutionSetupException, ReflectiveOperationException {
     this(userName,
         splits,
         hiveReadEntry,
         splitClasses,
         columns,
-        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig));
+        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties);
   }
 
   public HiveSubScan(final String userName,
                      final List<List<String>> splits,
                      final HiveReadEntry hiveReadEntry,
-                      final List<String> splitClasses,
+                     final List<String> splitClasses,
                      final List<SchemaPath> columns,
-                     final HiveStoragePlugin hiveStoragePlugin)
+                     final HiveStoragePlugin hiveStoragePlugin,
+                     final Map<String, String> confProperties)
     throws IOException, ReflectiveOperationException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
@@ -88,6 +92,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     this.splitClasses = splitClasses;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
 
     for (int i = 0; i < splits.size(); i++) {
       inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i)));
@@ -119,6 +124,11 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return hiveStoragePlugin.getConfig();
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveTableWithColumnCache getTable() {
     return table;
@@ -141,7 +151,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
 
   @JsonIgnore
   public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
   }
 
   @Override
@@ -152,7 +162,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     try {
-      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c8efb65..6fc567e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -93,7 +93,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_
 public class HiveUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
 
-  /** Partition value is received in string format. Convert it into appropriate object based on the type. */
+  /**
+   * Partition value is received in string format. Convert it into appropriate object based on the type.
+   *
+   * @param typeInfo type info
+   * @param value partition values
+   * @param defaultPartitionValue default partition value
+   * @return converted object
+   */
   public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
     if (typeInfo.getCategory() != Category.PRIMITIVE) {
       // In Hive only primitive types are allowed as partition column types.
@@ -147,6 +154,15 @@ public class HiveUtilities {
     return null;
   }
 
+  /**
+   * Populates vector with given value based on its type.
+   *
+   * @param vector vector instance
+   * @param managedBuffer Drill duffer
+   * @param val value
+   * @param start start position
+   * @param end end position
+   */
   public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val,
       final int start, final int end) {
     TypeProtos.MinorType type = vector.getField().getType().getMinorType();
@@ -307,6 +323,13 @@ public class HiveUtilities {
     }
   }
 
+  /**
+   * Obtains major type from given type info holder.
+   *
+   * @param typeInfo type info holder
+   * @param options session options
+   * @return appropriate major type, null otherwise. For some types may throw unsupported exception.
+   */
   public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE: {
@@ -343,8 +366,14 @@ public class HiveUtilities {
     return null;
   }
 
-  public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
-                                                                           OptionSet options) {
+  /**
+   * Obtains minor type from given primitive type info holder.
+   *
+   * @param primitiveTypeInfo primitive type info holder
+   * @param options session options
+   * @return appropriate minor type, otherwise throws unsupported type exception
+   */
+  public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, OptionSet options) {
     switch(primitiveTypeInfo.getPrimitiveCategory()) {
       case BINARY:
         return TypeProtos.MinorType.VARBINARY;
@@ -392,10 +421,8 @@ public class HiveUtilities {
    * @param job {@link JobConf} instance needed incase the table is StorageHandler based table.
    * @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables).
    * @param table Table object
-   * @throws Exception
    */
-  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd,
-      final Table table) throws Exception {
+  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, final Table table) throws Exception {
     final String inputFormatName = sd.getInputFormat();
     if (Strings.isNullOrEmpty(inputFormatName)) {
       final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
@@ -426,26 +453,23 @@ public class HiveUtilities {
   }
 
   /**
-   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
-   * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
+   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
+   * which also adds parameters from table to properties returned by that method.
    *
    * @param partition the source of partition level parameters
    * @param table     the source of table level parameters
    * @return properties
    */
   public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
-    final Properties properties;
     restoreColumns(table, partition);
-    properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+    Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
 
     // SerDe expects properties from Table, but above call doesn't add Table properties.
     // Include Table properties in final list in order to not to break SerDes that depend on
     // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
-    for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
-      if (entry.getKey() != null && entry.getKey() != null) {
-        properties.put(entry.getKey(), entry.getValue());
-      }
-    }
+    table.getParameters().entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> properties.put(e.getKey(), e.getValue()));
 
     return properties;
   }
@@ -453,8 +477,8 @@ public class HiveUtilities {
   /**
    * Sets columns from table cache to table and partition.
    *
+   * @param table the source of column lists cache
    * @param partition partition which will set column list
-   * @param table     the source of column lists cache
    */
   public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
     // exactly the same column lists for partitions or table
@@ -471,6 +495,9 @@ public class HiveUtilities {
    * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
    * which also sets columns from table cache to table and returns properties returned by
    * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+   *
+   * @param table Hive table with cached columns
+   * @return Hive table metadata
    */
   public static Properties getTableMetadata(HiveTableWithColumnCache table) {
     restoreColumns(table, null);
@@ -478,13 +505,18 @@ public class HiveUtilities {
       table.getDbName(), table.getTableName(), table.getPartitionKeys());
   }
 
+  /**
+   * Generates unsupported types exception message with list of supported types
+   * and throws user exception.
+   *
+   * @param unsupportedType unsupported type
+   */
   public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
-    StringBuilder errMsg = new StringBuilder();
-    errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));
-    errMsg.append(System.getProperty("line.separator"));
-    errMsg.append("Following Hive data types are supported in Drill for querying: ");
-    errMsg.append(
-        "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
+    StringBuilder errMsg = new StringBuilder()
+        .append("Unsupported Hive data type ").append(unsupportedType).append(". ")
+        .append(System.lineSeparator())
+        .append("Following Hive data types are supported in Drill for querying: ")
+        .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
 
     throw UserException.unsupportedError()
         .message(errMsg.toString())
@@ -633,7 +665,7 @@ public class HiveUtilities {
   }
 
   /**
-   * Get the input format from given {@link StorageDescriptor}
+   * Get the input format from given {@link StorageDescriptor}.
    *
    * @param properties table properties
    * @param hiveReadEntry hive read entry
@@ -681,5 +713,35 @@ public class HiveUtilities {
     }
     return false;
   }
+
+  /**
+   * Creates HiveConf based on given list of configuration properties.
+   *
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(Map<String, String> properties) {
+    logger.trace("Override HiveConf with the following properties {}", properties);
+    HiveConf hiveConf = new HiveConf();
+    properties.forEach(hiveConf::set);
+    return hiveConf;
+  }
+
+  /**
+   * Creates HiveConf based on properties in given HiveConf and configuration properties.
+   *
+   * @param hiveConf hive conf
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> properties) {
+    Properties changedProperties = hiveConf.getChangedProperties();
+    changedProperties.putAll(properties);
+    HiveConf newHiveConf = new HiveConf();
+    changedProperties.stringPropertyNames()
+        .forEach(name -> newHiveConf.set(name, changedProperties.getProperty(name)));
+    return newHiveConf;
+  }
+
 }
 
diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
index d06220f..018189c 100644
--- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,7 @@
   "storage":{
     hive : {
       type:"hive",
-      config : {
+      configProps : {
         "hive.metastore.uris" : "",
         "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true",
         "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 556deb2..ea8d5df 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -152,6 +152,12 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
     // checks only group scan
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
   }
 
   @Test
@@ -243,4 +249,15 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
     testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
   }
 
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 25393e7..94f39b8 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -412,6 +412,27 @@ public class TestHiveStorage extends HiveTestBase {
   public void testPhysicalPlanSubmission() throws Exception {
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.readtest");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("IOException: Not a file"));
+    test(query);
   }
 
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 80da976..c5c0d48 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -50,6 +50,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("hive.default", "partition_with_few_schemas")
         .baselineValues("hive.default", "kv_native")
         .baselineValues("hive.default", "kv_native_ext")
+        .baselineValues("hive.default", "sub_dir_table")
         .go();
 
     testBuilder()
@@ -254,6 +255,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
+        .baselineValues("DRILL", "hive.default", "sub_dir_table", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index f206999..074cb3b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -25,6 +25,7 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.serde.serdeConstants;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -80,7 +80,7 @@ public class HiveTestDataGenerator {
     this.whDir = whDir;
     this.dirTestWatcher = dirTestWatcher;
 
-    config = Maps.newHashMap();
+    config = new HashMap<>();
     config.put(ConfVars.METASTOREURIS.toString(), "");
     config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
     config.put("hive.metastore.warehouse.dir", whDir);
@@ -89,7 +89,9 @@ public class HiveTestDataGenerator {
 
   /**
    * Add Hive test storage plugin to the given plugin registry.
-   * @throws Exception
+   *
+   * @param pluginRegistry storage plugin registry
+   * @throws Exception in case if unable to update Hive storage plugin
    */
   public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) throws Exception {
     HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
@@ -101,7 +103,8 @@ public class HiveTestDataGenerator {
   /**
    * Update the current HiveStoragePlugin in given plugin registry with given <i>configOverride</i>.
    *
-   * @param configOverride
+   * @param pluginRegistry storage plugin registry
+   * @param configOverride config properties to be overridden
    * @throws DrillException if fails to update or no Hive plugin currently exists in given plugin registry.
    */
   public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, Map<String, String> configOverride)
@@ -113,7 +116,7 @@ public class HiveTestDataGenerator {
     }
 
     HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
-    newPluginConfig.getHiveConfigOverride().putAll(configOverride);
+    newPluginConfig.getConfigProps().putAll(configOverride);
 
     pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true);
   }
@@ -344,7 +347,7 @@ public class HiveTestDataGenerator {
         "charType CHAR(10))"
     );
 
-    /**
+    /*
      * Create a PARQUET table with all supported types.
      */
     executeQuery(hiveDriver,
@@ -542,6 +545,8 @@ public class HiveTestDataGenerator {
 
     createTestDataForDrillNativeParquetReaderTests(hiveDriver);
 
+    createSubDirTable(hiveDriver, testDataFile);
+
     ss.close();
   }
 
@@ -594,56 +599,61 @@ public class HiveTestDataGenerator {
       "location '%s'", thirdPartition));
   }
 
+  private void createSubDirTable(Driver hiveDriver, String testDataFile) {
+    String tableName = "sub_dir_table";
+    dirTestWatcher.copyResourceToRoot(Paths.get(testDataFile), Paths.get(tableName, "sub_dir", "data.txt"));
+
+    String tableLocation = Paths.get(dirTestWatcher.getRootDir().toURI().getPath(), tableName).toUri().getPath();
+
+    String tableDDL = String.format("create external table sub_dir_table (key int, value string) " +
+        "row format delimited fields terminated by ',' stored as textfile location '%s'", tableLocation);
+    executeQuery(hiveDriver, tableDDL);
+  }
+
   private File getTempFile() throws Exception {
     return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile();
   }
 
   private String generateTestDataFile() throws Exception {
-    final File file = getTempFile();
-    PrintWriter printWriter = new PrintWriter(file);
-    for (int i=1; i<=5; i++) {
-      printWriter.println (String.format("%d, key_%d", i, i));
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      for (int i = 1; i <= 5; i++) {
+        printWriter.println(String.format("%d, key_%d", i, i));
+      }
     }
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateTestDataFileForPartitionInput() throws Exception {
-    final File file = getTempFile();
-
-    PrintWriter printWriter = new PrintWriter(file);
-
-    String partValues[] = {"1", "2", "null"};
-
-    for(int c = 0; c < partValues.length; c++) {
-      for(int d = 0; d < partValues.length; d++) {
-        for(int e = 0; e < partValues.length; e++) {
-          for (int i = 1; i <= 5; i++) {
-            Date date = new Date(System.currentTimeMillis());
-            Timestamp ts = new Timestamp(System.currentTimeMillis());
-            printWriter.printf("%s,%s,%s,%s,%s",
-                date.toString(), ts.toString(), partValues[c], partValues[d], partValues[e]);
-            printWriter.println();
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      String partValues[] = {"1", "2", "null"};
+      for (String partValue : partValues) {
+        for (String partValue1 : partValues) {
+          for (String partValue2 : partValues) {
+            for (int i = 1; i <= 5; i++) {
+              Date date = new Date(System.currentTimeMillis());
+              Timestamp ts = new Timestamp(System.currentTimeMillis());
+              printWriter.printf("%s,%s,%s,%s,%s", date.toString(), ts.toString(), partValue, partValue1, partValue2);
+              printWriter.println();
+            }
           }
         }
       }
     }
 
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateAllTypesDataFile() throws Exception {
     File file = getTempFile();
 
-    PrintWriter printWriter = new PrintWriter(file);
-    printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689," +
-        "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield," +
-        "2013-07-05 17:01:00,2013-07-05,charfield");
-    printWriter.println(",,,,,,,,,,,,,,,,");
-    printWriter.close();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"+
+          "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"+
+          "2013-07-05 17:01:00,2013-07-05,charfield");
+      printWriter.println(",,,,,,,,,,,,,,,,");
+    }
 
     return file.getPath();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 49f149b..4c840a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -410,6 +410,9 @@ public final class ExecConstants {
   public static final OptionValidator HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR =
       new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
 
+  public static final String HIVE_CONF_PROPERTIES = "store.hive.conf.properties";
+  public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new StringValidator(HIVE_CONF_PROPERTIES);
+
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final long SLICE_TARGET_DEFAULT = 100000l;
   public static final PositiveLongValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 36e74a0..e7518b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.opt;
 
-import com.google.common.collect.Lists;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -28,9 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
 import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.logical.data.Project;
@@ -53,6 +49,7 @@ import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -64,6 +61,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class BasicOptimizer extends Optimizer {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -99,8 +97,7 @@ public class BasicOptimizer extends Optimizer {
         .version(logicalProperties.version)
         .generator(logicalProperties.generator)
         .options(new JSONOptions(context.getOptions().getOptionList())).build();
-    final PhysicalPlan p = new PhysicalPlan(props, physOps);
-    return p;
+    return new PhysicalPlan(props, physOps);
   }
 
   public static class BasicOptimizationContext implements OptimizationContext {
@@ -128,30 +125,28 @@ public class BasicOptimizer extends Optimizer {
      */
     private final LogicalPlan logicalPlan;
 
-    public LogicalConverter(final LogicalPlan logicalPlan) {
+    LogicalConverter(final LogicalPlan logicalPlan) {
       this.logicalPlan = logicalPlan;
     }
 
     @Override
     public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-      final List<Ordering> orderDefs = Lists.newArrayList();
       PhysicalOperator input = groupBy.getInput().accept(this, value);
 
       if (groupBy.getKeys().size() > 0) {
-        for(NamedExpression e : groupBy.getKeys()) {
-          orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
-        }
+        List<Ordering> orderDefs = groupBy.getKeys().stream()
+            .map(e -> new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST))
+            .collect(Collectors.toList());
         input = new Sort(input, orderDefs, false);
       }
 
-      final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
-      return sa;
+      return new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
     }
 
     @Override
     public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException {
       PhysicalOperator input = window.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
+      final List<Ordering> ods = new ArrayList<>();
 
       input = new Sort(input, ods, false);
 
@@ -162,11 +157,7 @@ public class BasicOptimizer extends Optimizer {
     @Override
     public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException {
       final PhysicalOperator input = order.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
-      for (Ordering o : order.getOrderings()){
-        ods.add(o);
-      }
-
+      final List<Ordering> ods = new ArrayList<>(order.getOrderings());
       return new SelectionVectorRemover(new Sort(input, ods, false));
     }
 
@@ -180,18 +171,20 @@ public class BasicOptimizer extends Optimizer {
     @Override
     public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException {
       PhysicalOperator leftOp = join.getLeft().accept(this, value);
-      final List<Ordering> leftOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
-      }
+
+      List<Ordering> leftOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getLeft()))
+          .collect(Collectors.toList());
+
       leftOp = new Sort(leftOp, leftOrderDefs, false);
       leftOp = new SelectionVectorRemover(leftOp);
 
       PhysicalOperator rightOp = join.getRight().accept(this, value);
-      final List<Ordering> rightOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
-      }
+
+      List<Ordering> rightOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getRight()))
+          .collect(Collectors.toList());
+
       rightOp = new Sort(rightOp, rightOrderDefs, false);
       rightOp = new SelectionVectorRemover(rightOp);
 
@@ -210,7 +203,7 @@ public class BasicOptimizer extends Optimizer {
       try {
         final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
         final String user = userSession.getSession().getCredentials().getUserName();
-        return storagePlugin.getPhysicalScan(user, scan.getSelection());
+        return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }
@@ -241,8 +234,8 @@ public class BasicOptimizer extends Optimizer {
     }
 
     @Override
-    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) throws OptimizerException {
-      return new org.apache.drill.exec.physical.config.UnnestPOP(null, unnest.getColumn());
+    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) {
+      return new UnnestPOP(null, unnest.getColumn());
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 4ee7671..53036f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,6 +46,7 @@ public abstract class DrillTable implements Table {
   private final StoragePlugin plugin;
   private final String userName;
   private GroupScan scan;
+  private SessionOptionManager options;
 
   /**
    * Creates a DrillTable instance for a @{code TableType#Table} table.
@@ -85,12 +87,16 @@ public abstract class DrillTable implements Table {
     this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
   }
 
+  public void setOptions(SessionOptionManager options) {
+    this.options = options;
+  }
+
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
       if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
         this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot());
       } else {
-        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
+        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options);
       }
     }
     return scan;
@@ -138,7 +144,8 @@ public abstract class DrillTable implements Table {
     return true;
   }
 
-  @Override public boolean isRolledUp(String column) {
+  @Override
+  public boolean isRolledUp(String column) {
     return false;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3f65ad2..d6b0951 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -78,6 +78,7 @@ import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -113,7 +114,6 @@ public class SqlConverter {
   private final DrillConfig drillConfig;
   private RelOptCluster cluster;
 
-  private String sql;
   private VolcanoPlanner planner;
   private boolean useRootSchema = false;
 
@@ -187,12 +187,10 @@ public class SqlConverter {
 
   public SqlNode validate(final SqlNode parsedNode) {
     try {
-      SqlNode validatedNode = validator.validate(parsedNode);
-      return validatedNode;
+      return validator.validate(parsedNode);
     } catch (RuntimeException e) {
       UserException.Builder builder = UserException
-          .validationError(e)
-          .addContext("SQL Query", sql);
+          .validationError(e);
       if (isInnerQuery) {
         builder.message("Failure validating a view your query is dependent upon.");
       }
@@ -240,7 +238,7 @@ public class SqlConverter {
 
   private class DrillValidator extends SqlValidatorImpl {
 
-    protected DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
+    DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
         RelDataTypeFactory typeFactory, SqlConformance conformance) {
       super(opTab, catalogReader, typeFactory, conformance);
     }
@@ -382,15 +380,11 @@ public class SqlConverter {
 
     //To avoid unexpected column errors set a value of top to false
     final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false);
-    final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
-    return rel2;
+    return rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
   }
 
   private class Expander implements RelOptTable.ViewExpander {
 
-    public Expander() {
-    }
-
     @Override
     public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) {
       final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
@@ -550,7 +544,7 @@ public class SqlConverter {
      * also preserving nullability.
      *
      * <p>Tries to expand the cast, and therefore the result may be something
-     * other than a {@link RexCall} to the CAST operator, such as a
+     * other than a {@link org.apache.calcite.rex.RexCall} to the CAST operator, such as a
      * {@link RexLiteral} if {@code matchNullability} is false.
      *
      * @param type             Type to cast to
@@ -611,7 +605,7 @@ public class SqlConverter {
     /**
      * Disallow temporary tables presence in sql statement (ex: in view definitions)
      */
-    public void disallowTemporaryTables() {
+    void disallowTemporaryTables() {
       this.allowTemporaryTables = false;
     }
 
@@ -647,13 +641,19 @@ public class SqlConverter {
         }
       }
 
-      return super.getTable(names);
+      Prepare.PreparingTable table = super.getTable(names);
+      DrillTable unwrap;
+      // add session options if found table is Drill table
+      if (table != null && (unwrap = table.unwrap(DrillTable.class)) != null) {
+        unwrap.setOptions(session.getOptions());
+      }
+      return table;
     }
 
     @Override
     public List<List<String>> getSchemaPaths() {
       if (useRootSchema) {
-        return ImmutableList.<List<String>>of(ImmutableList.<String>of());
+        return ImmutableList.of(ImmutableList.of());
       }
       return super.getSchemaPaths();
     }
@@ -662,8 +662,8 @@ public class SqlConverter {
      * check if the schema provided is a valid schema:
      * <li>schema is not indicated (only one element in the names list)<li/>
      *
-     * @param names             list of schema and table names, table name is always the last element
-     * @return throws a userexception if the schema is not valid.
+     * @param names list of schema and table names, table name is always the last element
+     * @throws UserException if the schema is not valid.
      */
     private void isValidSchema(final List<String> names) throws UserException {
       SchemaPlus defaultSchema = session.getDefaultSchema(this.rootSchema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a9c4742..a16bb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.server.options;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.DrillConfig;
@@ -40,15 +43,13 @@ import org.apache.drill.exec.util.AssertionUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
- * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
- * persist between restarts.
- */
 
 /**
+ *  <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
+ *  Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
+ *  persist between restarts.
+ *  </p>
+ *
  *  <p> All the system options are externalized into conf file. While adding a new system option
  *  a validator should be added and the default value for the option should be set in
  *  the conf files(example : drill-module.conf) under the namespace drill.exec.options.
@@ -173,6 +174,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR),
       new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
       new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
       new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL),
@@ -237,11 +239,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
-    final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap();
+    CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
+      .collect(Collectors.toMap(
+        d -> d.getValidator().getOptionName(),
+        Function.identity(),
+        (o, n) -> n,
+        CaseInsensitiveMap::newHashMap));
 
-    for (final OptionDefinition definition: definitions) {
-      map.put(definition.getValidator().getOptionName(), definition);
-    }
 
     if (AssertionUtil.isAssertionsEnabled()) {
       map.put(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, new OptionDefinition(ExecConstants.DRILLBIT_CONTROLS_VALIDATOR));
@@ -295,7 +299,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
    * Initializes this option manager.
    *
    * @return this option manager
-   * @throws Exception
+   * @throws Exception if unable to initialize option manager
    */
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
@@ -395,16 +399,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
 
   @Override
   public void deleteAllLocalOptions() {
-    final Set<String> names = Sets.newHashSet();
-    for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) {
-      names.add(entry.getKey());
-    }
-    for (final String name : names) {
-      options.delete(name); // should be lowercase
-    }
+    Iterable<Map.Entry<String, PersistedOptionValue>> allOptions = () -> options.getAll();
+    StreamSupport.stream(allOptions.spliterator(), false)
+      .map(Entry::getKey)
+      .forEach(name -> options.delete(name)); // should be lowercase
   }
 
-  public static CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
+  private CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
     // populate the options from the config
     final Map<String, OptionValue> defaults = new HashMap<>();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index de2b2c3..d37a0a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.PlannerPhase;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Abstract class for StorePlugin implementations.
  * See StoragePlugin for description of the interface intent and its methods.
@@ -102,12 +103,23 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     }
   }
 
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection);
+  }
+
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
 
   @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, columns);
+  }
+
+  @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 50f2731..2617065 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Interface for all implementations of the storage plugins. Different implementations of the storage
  * formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -36,18 +37,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
 
   /** Indicates if Drill can read the table from this format.
   */
-  public boolean supportsRead();
+  boolean supportsRead();
 
   /** Indicates if Drill can write a table to this format (e.g. as JSON, csv, etc.).
    */
-  public boolean supportsWrite();
+  boolean supportsWrite();
 
   /** An implementation of this method will return one or more specialized rules that Drill query
    *  optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set.
    * @return an empty set or a set of plugin specific physical optimizer rules.
    */
   @Deprecated
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
+  Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -55,9 +56,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * @param userName User whom to impersonate when when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @return The physical scan operator for the particular GroupScan (read) node.
-   * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException;
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -66,18 +76,29 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data source.
    * @return The physical scan operator for the particular GroupScan (read) node.
-   * @throws IOException
   */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException;
 
-  /** Method returns a Jackson serializable object that extends a StoragePluginConfig
-  * @return an extension of StoragePluginConfig
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data source.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException;
+
+  /**
+   * Method returns a Jackson serializable object that extends a StoragePluginConfig.
+   *
+   * @return an extension of StoragePluginConfig
   */
-  public StoragePluginConfig getConfig();
+  StoragePluginConfig getConfig();
 
   /**
    * Initialize the storage plugin. The storage plugin will not be used until this method is called.
    */
-  public void start() throws IOException;
+  void start() throws IOException;
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4b1e9dd..b0cc209 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -555,6 +555,10 @@ drill.exec.options: {
     store.hive.optimize_scan_with_native_readers: false,
     store.hive.parquet.optimize_scan_with_native_reader: false,
     store.hive.maprdb_json.optimize_scan_with_native_reader: false,
+    # Properties values should NOT be set in double-quotes or any other quotes.
+    # Property name and value should be separated by =.
+    # Properties should be separated by new line (\n).
+    store.hive.conf.properties: "",
     store.json.all_text_mode: false,
     store.json.writer.allow_nan_inf: true,
     store.json.reader.allow_nan_inf: true,