You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/09 12:17:26 UTC

[drill] branch master updated (aee899c -> e79db14)

This is an automated email from the ASF dual-hosted git repository.

arina pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from aee899c  DRILL-6570: Fixed IndexOutofBoundException in Parquet Reader
     new 139f715  DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level
     new 2ea603f  DRILL-6519: Add String Distance and Phonetic Functions
     new 25e65c0  DRILL-6549: batch sizing for nested loop join
     new e79db14  DRILL-6529 Project Batch Sizing causes two LargeFileCompilation tests to timeout

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java |   6 +-
 .../ConvertHiveParquetScanToDrillParquetScan.java  |  21 +-
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  17 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |  20 +-
 .../org/apache/drill/exec/store/hive/HiveScan.java | 105 +++---
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  57 ++-
 .../exec/store/hive/HiveStoragePluginConfig.java   |  21 +-
 .../apache/drill/exec/store/hive/HiveSubScan.java  |  22 +-
 .../drill/exec/store/hive/HiveUtilities.java       | 108 ++++--
 .../main/resources/bootstrap-storage-plugins.json  |   2 +-
 .../exec/TestHiveDrillNativeParquetReader.java     |  17 +
 .../apache/drill/exec/hive/TestHiveStorage.java    |  21 ++
 .../exec/hive/TestInfoSchemaOnHiveStorage.java     |   2 +
 .../exec/store/hive/HiveTestDataGenerator.java     |  80 ++--
 exec/java-exec/pom.xml                             |   6 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../drill/exec/expr/fn/impl/PhoneticFunctions.java | 407 +++++++++++++++++++++
 .../exec/expr/fn/impl/StringDistanceFunctions.java | 329 +++++++++++++++++
 .../org/apache/drill/exec/opt/BasicOptimizer.java  |  53 ++-
 .../exec/physical/impl/join/NestedLoopJoin.java    |   3 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |  62 +++-
 .../physical/impl/join/NestedLoopJoinTemplate.java |  24 +-
 .../drill/exec/planner/logical/DrillTable.java     |  11 +-
 .../drill/exec/planner/sql/SqlConverter.java       |  34 +-
 .../exec/server/options/SystemOptionManager.java   |  43 +--
 .../drill/exec/store/AbstractStoragePlugin.java    |  12 +
 .../org/apache/drill/exec/store/StoragePlugin.java |  45 ++-
 .../java-exec/src/main/resources/drill-module.conf |   4 +
 .../exec/compile/TestLargeFileCompilation.java     |  10 +-
 .../drill/exec/fn/impl/TestPhoneticFunctions.java  | 119 ++++++
 .../exec/fn/impl/TestStringDistanceFunctions.java  |  80 ++++
 .../exec/physical/unit/TestOutputBatchSize.java    | 342 ++++++++++++++++-
 .../java/org/apache/drill/test/QueryBuilder.java   |  21 ++
 exec/jdbc-all/pom.xml                              |   2 +-
 34 files changed, 1831 insertions(+), 278 deletions(-)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/PhoneticFunctions.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringDistanceFunctions.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestStringDistanceFunctions.java


[drill] 03/04: DRILL-6549: batch sizing for nested loop join

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 25e65c0fe2aed741b143d69c0ffbe23a3433bedc
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Thu Jun 28 03:50:36 2018 -0700

    DRILL-6549: batch sizing for nested loop join
    
    closes #1363
---
 .../exec/physical/impl/join/NestedLoopJoin.java    |   3 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |  62 +++-
 .../physical/impl/join/NestedLoopJoinTemplate.java |  24 +-
 .../exec/physical/unit/TestOutputBatchSize.java    | 342 ++++++++++++++++++++-
 4 files changed, 407 insertions(+), 24 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index f7d96ad..725c46d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -37,6 +37,9 @@ public interface NestedLoopJoin {
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing);
+
+  void setTargetOutputCount(int targetOutputCount);
+
   // Produce output records taking into account join type
   public int outputRecords(JoinRelType joinType);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index ae14fb3..e2532e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -50,8 +52,8 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
-
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -65,9 +67,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
-  // Maximum number records in the outgoing batch
-  protected static final int MAX_BATCH_SIZE = 4096;
-
   // Input indexes to correctly update the stats
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
@@ -130,6 +129,11 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
@@ -162,6 +166,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
             }
             // fall through
           case OK:
+            // For right side, use aggregate i.e. average row width across batches
+            batchMemoryManager.update(RIGHT_INDEX, 0, true);
+            logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
             addBatchToHyperContainer(right);
             break;
           case OUT_OF_MEMORY:
@@ -179,7 +186,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     }
 
     // allocate space for the outgoing batch
-    allocateVectors();
+    batchMemoryManager.allocateVectors(container);
+
+    nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
     // invoke the runtime generated method to emit records in the output batch
     outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
@@ -193,6 +202,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     container.setRecordCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    }
+
     logger.debug("Number of records emitted: " + outputRecords);
 
     return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -332,15 +345,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   }
 
   /**
-   * Simple method to allocate space for all the vectors in the container.
-   */
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vw : container) {
-      AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
-    }
-  }
-
-  /**
    * Builds the output container's schema. Goes over the left and the right
    * batch and adds the corresponding vectors to the output container.
    * @throws SchemaChangeException if batch schema was changed during execution
@@ -352,6 +356,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         return;
       }
 
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
         for (final VectorWrapper<?> vw : left) {
@@ -380,7 +387,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         addBatchToHyperContainer(right);
       }
 
-      allocateVectors();
       nljWorker = setupWorker();
 
       // if left batch is empty, fetch next
@@ -388,7 +394,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         leftUpstream = next(LEFT_INPUT, left);
       }
 
-      container.setRecordCount(0);
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     } catch (ClassTransformationException | IOException e) {
@@ -412,6 +420,26 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
 
   @Override
   public void close() {
+    updateBatchMemoryManagerStats();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+              batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
+
     rightContainer.clear();
     rightCounts.clear();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index cdd02f4..adf681b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -29,12 +29,16 @@ import javax.inject.Named;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
 /*
  * Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
  * class contains the main nested loop join logic.
  */
 public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+
   // Current left input batch being processed
   private RecordBatch left = null;
 
@@ -50,6 +54,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
   // Iteration status tracker
   private IterationStatusTracker tracker = new IterationStatusTracker();
 
+  private int targetOutputRecords;
+
   /**
    * Method initializes necessary state and invokes the doSetup() to set the
    * input and output value vector references.
@@ -69,10 +75,14 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
     this.leftRecordCount = left.getRecordCount();
     this.rightCounts = rightCounts;
     this.outgoing = outgoing;
-
     doSetup(context, rightContainer, left, outgoing);
   }
 
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
   /**
    * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
    * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
@@ -84,11 +94,11 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
     int outputIndex = 0;
     while (leftRecordCount != 0) {
       outputIndex = populateOutgoingBatch(joinType, outputIndex);
-      if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+      if (outputIndex >= targetOutputRecords) {
         break;
       }
       // reset state and get next left batch
-      resetAndGetNextLeft();
+      resetAndGetNextLeft(outputIndex);
     }
     return outputIndex;
   }
@@ -128,7 +138,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
             outputIndex++;
             rightRecordMatched = true;
 
-            if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+            if (outputIndex >= targetOutputRecords) {
               nextRightRecordToProcess++;
 
               // no more space left in the batch, stop processing
@@ -143,7 +153,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
         // project records from the left side only, records from right will be null
         emitLeft(nextLeftRecordToProcess, outputIndex);
         outputIndex++;
-        if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+        if (outputIndex >= targetOutputRecords) {
           nextLeftRecordToProcess++;
 
           // no more space left in the batch, stop processing
@@ -165,7 +175,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
    * Resets some internal state which indicates the next records to process in the left and right batches,
    * also fetches the next left input batch.
    */
-  private void resetAndGetNextLeft() {
+  private void resetAndGetNextLeft(int outputIndex) {
     for (VectorWrapper<?> vw : left) {
       vw.getValueVector().clear();
     }
@@ -181,6 +191,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
         leftRecordCount = 0;
         break;
       case OK:
+        setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+        logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
         leftRecordCount = left.getRecordCount();
         break;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 471f1b8..84a4fbc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -17,18 +17,23 @@
  */
 package org.apache.drill.exec.physical.unit;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.directory.api.util.Strings;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -2288,6 +2293,341 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
   }
 
   @Test
+  public void testNestedLoopJoinMultipleOutputBatches() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+    mockOpContext(nestedLoopJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
+  public void testNestedLoopJoinSingleOutputBatch() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(1)  // verify number of batches
+            .expectedBatchSize(totalSize) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    LogicalExpression functionCallExpr = new FunctionCall("<",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 500;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "c1", "a2", "c2")
+            .expectedNumBatches(2)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      for (long j = i+1; j < numRows+1; j++) {
+        opTestBuilder.baselineValues(5l, i, 6l, j);
+      }
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+
+    // set very low value of output batch size so we can do only one row per batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(10)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testLeftNestedLoopJoin() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();
     StringBuilder batchString = new StringBuilder();


[drill] 04/04: DRILL-6529 Project Batch Sizing causes two LargeFileCompilation tests to timeout

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e79db14d81f8ee15d27cc6026bc0ee409e0c0a3c
Author: karthik <km...@maprtech.com>
AuthorDate: Fri Jun 22 18:26:37 2018 -0700

    DRILL-6529 Project Batch Sizing causes two LargeFileCompilation tests to timeout
    
    closes #1335
    
    Changed Project test columns to 10000 and reduces columns projected to SORT tests to 2500
---
 .../apache/drill/exec/compile/TestLargeFileCompilation.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 07dbae2..86ef855 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -29,7 +29,7 @@ import org.junit.rules.TestRule;
 
 @Category({SlowTest.class})
 public class TestLargeFileCompilation extends BaseTestQuery {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(150000); // 150secs
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200000); // 200 secs
 
   private static final String LARGE_QUERY_GROUP_BY;
 
@@ -49,7 +49,9 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   private static final int ITERATION_COUNT = Integer.valueOf(System.getProperty("TestLargeFileCompilation.iteration", "1"));
 
-  private static final int NUM_PROJECT_COLUMNS = 5000;
+  private static final int NUM_PROJECT_COLUMNS = 2500;
+
+  private static final int NUM_PROJECT_TEST_COLUMNS = 10000;
 
   private static final int NUM_ORDERBY_COLUMNS = 500;
 
@@ -77,7 +79,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
+    for (int i = 0; i < NUM_PROJECT_TEST_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`\n\n\t");
@@ -145,14 +147,12 @@ public class TestLargeFileCompilation extends BaseTestQuery {
     testNoResult(ITERATION_COUNT, LARGE_QUERY_GROUP_BY);
   }
 
-  @Ignore // TODO: DRILL-6529
   @Test
   public void testEXTERNAL_SORT() throws Exception {
     testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
     testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY);
   }
 
-  @Ignore // TODO: DRILL-6529
   @Test
   public void testTOP_N_SORT() throws Exception {
     testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);


[drill] 01/04: DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 139f7156bcb2f6fef5b36f116c9c1b6095fc4b9c
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Thu Jul 5 20:48:37 2018 +0000

    DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level
    
    closes #1365
---
 ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java |   6 +-
 .../ConvertHiveParquetScanToDrillParquetScan.java  |  21 ++--
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  17 +++-
 .../store/hive/HiveDrillNativeParquetScan.java     |  20 +++-
 .../org/apache/drill/exec/store/hive/HiveScan.java | 105 ++++++++++----------
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  57 ++++++++---
 .../exec/store/hive/HiveStoragePluginConfig.java   |  21 ++--
 .../apache/drill/exec/store/hive/HiveSubScan.java  |  22 +++--
 .../drill/exec/store/hive/HiveUtilities.java       | 108 ++++++++++++++++-----
 .../main/resources/bootstrap-storage-plugins.json  |   2 +-
 .../exec/TestHiveDrillNativeParquetReader.java     |  17 ++++
 .../apache/drill/exec/hive/TestHiveStorage.java    |  21 ++++
 .../exec/hive/TestInfoSchemaOnHiveStorage.java     |   2 +
 .../exec/store/hive/HiveTestDataGenerator.java     |  80 ++++++++-------
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../org/apache/drill/exec/opt/BasicOptimizer.java  |  53 +++++-----
 .../drill/exec/planner/logical/DrillTable.java     |  11 ++-
 .../drill/exec/planner/sql/SqlConverter.java       |  34 +++----
 .../exec/server/options/SystemOptionManager.java   |  43 ++++----
 .../drill/exec/store/AbstractStoragePlugin.java    |  12 +++
 .../org/apache/drill/exec/store/StoragePlugin.java |  45 ++++++---
 .../java-exec/src/main/resources/drill-module.conf |   4 +
 22 files changed, 457 insertions(+), 247 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index 50fee9c..3bc33b3 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -88,7 +88,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
       HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
       HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
       HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
-          hiveScan.getStoragePlugin().getHiveConf());
+          hiveScan.getHiveConf());
       if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
         // table is empty, use original scan
         return;
@@ -134,7 +134,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
     List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
         .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
         .collect(Collectors.toList());
-    JsonTableGroupScan nariveMapRDBScan =
+    JsonTableGroupScan nativeMapRDBScan =
         new JsonTableGroupScan(
             hiveScan.getUserName(),
             hiveScan.getStoragePlugin(),
@@ -155,7 +155,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
         hiveScanRel.getCluster(),
         hiveScanRel.getTraitSet(),
         hiveScanRel.getTable(),
-        nariveMapRDBScan,
+        nativeMapRDBScan,
         nativeScanRowType,
         hiveScanCols);
   }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 2a2f4fb..ea71157 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.planner.sql.logical;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -43,6 +41,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -88,7 +88,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
       final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
       final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
 
-      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
+      final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf());
       final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
 
       if (logicalInputSplits.isEmpty()) {
@@ -123,7 +123,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
    * Create mapping of Hive partition column to directory column mapping.
    */
   private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
-    final Map<String, String> partitionColMapping = Maps.newHashMap();
+    final Map<String, String> partitionColMapping = new HashMap<>();
     int i = 0;
     for (FieldSchema col : hiveTable.getPartitionKeys()) {
       partitionColMapping.put(col.getName(), partitionColumnLabel+i);
@@ -143,8 +143,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
 
-    final List<String> nativeScanColNames = Lists.newArrayList();
-    final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+    final List<String> nativeScanColNames = new ArrayList<>();
+    final List<RelDataType> nativeScanColTypes = new ArrayList<>();
     for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
       final String dirColName = partitionColMapping.get(field.getName());
       if (dirColName != null) { // partition column
@@ -161,8 +161,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     // Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
     // columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
     // unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
-    final List<SchemaPath> nativeScanCols = Lists.newArrayList();
-    for(SchemaPath colName : hiveScanRel.getColumns()) {
+    final List<SchemaPath> nativeScanCols = new ArrayList<>();
+    for (SchemaPath colName : hiveScanRel.getColumns()) {
       final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath());
       if (partitionCol != null) {
         nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
@@ -177,7 +177,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
             hiveScan.getUserName(),
             nativeScanCols,
             hiveScan.getStoragePlugin(),
-            logicalInputSplits);
+            logicalInputSplits,
+            hiveScan.getConfProperties());
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
@@ -194,7 +195,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
   private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
       final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
 
-    final List<RexNode> rexNodes = Lists.newArrayList();
+    final List<RexNode> rexNodes = new ArrayList<>();
     final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
     final RelDataType hiveScanRowType = hiveScanRel.getRowType();
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index e227015..d334ec8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 @JsonTypeName("hive-drill-native-parquet-row-group-scan")
 public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan {
@@ -45,6 +46,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -53,12 +55,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
                                             @JsonProperty("columns") List<SchemaPath> columns,
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                            @JsonProperty("confProperties") Map<String, String> confProperties,
                                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
         columns,
         hivePartitionHolder,
+        confProperties,
         filter);
   }
 
@@ -67,11 +71,13 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             List<RowGroupReadEntry> rowGroupReadEntries,
                                             List<SchemaPath> columns,
                                             HivePartitionHolder hivePartitionHolder,
+                                            Map<String, String> confProperties,
                                             LogicalExpression filter) {
     super(userName, rowGroupReadEntries, columns, filter);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
   }
 
   @JsonProperty
@@ -84,6 +90,11 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getHiveStoragePlugin() {
     return hiveStoragePlugin;
@@ -92,7 +103,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
   }
 
   @Override
@@ -102,7 +113,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
   }
 
   @Override
@@ -114,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
     Path path = new Path(rowGroupReadEntry.getPath()).getParent();
     return new ProjectionPusher().pushProjectionsAndFilters(
-        new JobConf(hiveStoragePlugin.getHiveConf()),
+        new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
         path.getParent());
   }
 
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 03a80d3..a973fa1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -62,7 +62,8 @@ import java.util.Map;
 public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
 
   private final HiveStoragePlugin hiveStoragePlugin;
-  private HivePartitionHolder hivePartitionHolder;
+  private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -71,10 +72,12 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     @JsonProperty("columns") List<SchemaPath> columns,
                                     @JsonProperty("entries") List<ReadEntryWithPath> entries,
                                     @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                    @JsonProperty("confProperties") Map<String, String> confProperties,
                                     @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
     this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
 
     init();
   }
@@ -82,19 +85,22 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
-                                    List<LogicalInputSplit> logicalInputSplits) throws IOException {
-    this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
+                                    List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties) throws IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties,
                                     LogicalExpression filter) throws IOException {
     super(userName, columns, new ArrayList<>(), filter);
 
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.hivePartitionHolder = new HivePartitionHolder();
+    this.confProperties = confProperties;
 
     for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
       Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
@@ -122,6 +128,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
     super(that);
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.hivePartitionHolder = that.hivePartitionHolder;
+    this.confProperties = that.confProperties;
   }
 
   @JsonProperty
@@ -134,6 +141,11 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
     List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
@@ -142,7 +154,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       List<String> values = hivePartitionHolder.get(readEntry.getPath());
       subPartitionHolder.add(readEntry.getPath(), values);
     }
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 11d47f3..d631740 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -50,8 +50,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
 
@@ -64,6 +62,7 @@ public class HiveScan extends AbstractGroupScan {
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveReadEntry hiveReadEntry;
   private final HiveMetadataProvider metadataProvider;
+  private final Map<String, String> confProperties;
 
   private List<List<LogicalInputSplit>> mappings;
   private List<LogicalInputSplit> inputSplits;
@@ -75,22 +74,24 @@ public class HiveScan extends AbstractGroupScan {
                   @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry,
                   @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig,
                   @JsonProperty("columns") final List<SchemaPath> columns,
+                  @JsonProperty("confProperties") final Map<String, String> confProperties,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this(userName,
         hiveReadEntry,
         (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
         columns,
-        null);
+        null, confProperties);
   }
 
   public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin,
-      final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException {
+                  final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
     if (metadataProvider == null) {
-      this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf());
+      this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf());
     } else {
       this.metadataProvider = metadataProvider;
     }
@@ -102,10 +103,11 @@ public class HiveScan extends AbstractGroupScan {
     this.hiveReadEntry = that.hiveReadEntry;
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.metadataProvider = that.metadataProvider;
+    this.confProperties = that.confProperties;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider);
+    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties);
   }
 
   @JsonProperty
@@ -123,29 +125,37 @@ public class HiveScan extends AbstractGroupScan {
     return columns;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getStoragePlugin() {
     return hiveStoragePlugin;
   }
 
-  protected HiveMetadataProvider getMetadataProvider() {
-    return metadataProvider;
+  @JsonIgnore
+  public HiveConf getHiveConf() {
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
   }
 
-  private List<LogicalInputSplit> getInputSplits() {
-    if (inputSplits == null) {
-      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
-    }
-
-    return inputSplits;
+  @JsonIgnore
+  public boolean isNativeReader() {
+    return false;
   }
 
+  @Override
+  public boolean supportsPartitionFilterPushdown() {
+    List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+    return !(partitionKeys == null || partitionKeys.size() == 0);
+  }
 
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
     mappings = new ArrayList<>();
     for (int i = 0; i < endpoints.size(); i++) {
-      mappings.add(new ArrayList<LogicalInputSplit>());
+      mappings.add(new ArrayList<>());
     }
     final int count = endpoints.size();
     final List<LogicalInputSplit> inputSplits = getInputSplits();
@@ -158,9 +168,9 @@ public class HiveScan extends AbstractGroupScan {
   public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
       final List<LogicalInputSplit> splits = mappings.get(minorFragmentId);
-      List<HivePartitionWrapper> parts = Lists.newArrayList();
-      final List<List<String>> encodedInputSplits = Lists.newArrayList();
-      final List<String> splitTypes = Lists.newArrayList();
+      List<HivePartitionWrapper> parts = new ArrayList<>();
+      final List<List<String>> encodedInputSplits = new ArrayList<>();
+      final List<String> splitTypes = new ArrayList<>();
       for (final LogicalInputSplit split : splits) {
         final Partition splitPartition = split.getPartition();
         if (splitPartition != null) {
@@ -176,7 +186,7 @@ public class HiveScan extends AbstractGroupScan {
       }
 
       final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
-      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -192,7 +202,7 @@ public class HiveScan extends AbstractGroupScan {
     final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) {
       endpointMap.put(endpoint.getAddress(), endpoint);
-      logger.debug("endpoing address: {}", endpoint.getAddress());
+      logger.debug("Endpoint address: {}", endpoint.getAddress());
     }
     final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
@@ -204,7 +214,7 @@ public class HiveScan extends AbstractGroupScan {
       for (final LogicalInputSplit split : inputSplits) {
         final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
         for (final String loc : split.getLocations()) {
-          logger.debug("split location: {}", loc);
+          logger.debug("Split location: {}", loc);
           final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
             if (affinityMap.containsKey(endpoint)) {
@@ -218,13 +228,8 @@ public class HiveScan extends AbstractGroupScan {
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    for (final DrillbitEndpoint ep : affinityMap.keySet()) {
-      Preconditions.checkNotNull(ep);
-    }
-    for (final EndpointAffinity a : affinityMap.values()) {
-      Preconditions.checkNotNull(a.getEndpoint());
-    }
-    return Lists.newArrayList(affinityMap.values());
+
+    return new ArrayList<>(affinityMap.values());
   }
 
   @Override
@@ -243,21 +248,8 @@ public class HiveScan extends AbstractGroupScan {
     }
   }
 
-  protected int getSerDeOverheadFactor() {
-    final int projectedColumnCount;
-    if (Utilities.isStarQuery(columns)) {
-      Table hiveTable = hiveReadEntry.getTable();
-      projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
-    } else {
-      // In cost estimation, # of project columns should be >= 1, even for skipAll query.
-      projectedColumnCount = Math.max(columns.size(), 1);
-    }
-
-    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
-  }
-
   @Override
-  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) {
     return new HiveScan(this);
   }
 
@@ -275,6 +267,7 @@ public class HiveScan extends AbstractGroupScan {
         + ", numPartitions=" + numPartitions
         + ", partitions= " + partitions
         + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
+        + ", confProperties=" + confProperties
         + "]";
   }
 
@@ -290,22 +283,24 @@ public class HiveScan extends AbstractGroupScan {
     return true;
   }
 
-  // Return true if the current table is partitioned false otherwise
-  public boolean supportsPartitionFilterPushdown() {
-    final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
-    if (partitionKeys == null || partitionKeys.size() == 0) {
-      return false;
+  private List<LogicalInputSplit> getInputSplits() {
+    if (inputSplits == null) {
+      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
-    return true;
-  }
 
-  @JsonIgnore
-  public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return inputSplits;
   }
 
-  @JsonIgnore
-  public boolean isNativeReader() {
-    return false;
+  private int getSerDeOverheadFactor() {
+    final int projectedColumnCount;
+    if (Utilities.isStarQuery(columns)) {
+      Table hiveTable = hiveReadEntry.getTable();
+      projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
+    } else {
+      // In cost estimation, # of project columns should be >= 1, even for skipAll query.
+      projectedColumnCount = Math.max(columns.size(), 1);
+    }
+
+    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
   }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index ced8b01..adf1348 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -18,27 +18,34 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.calcite.schema.SchemaPlus;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -53,17 +60,16 @@ import org.apache.thrift.transport.TTransportException;
 
 public class HiveStoragePlugin extends AbstractStoragePlugin {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
 
   private final HiveStoragePluginConfig config;
   private HiveSchemaFactory schemaFactory;
   private final HiveConf hiveConf;
 
-  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name)
-      throws ExecutionSetupException {
+  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
     super(context, name);
     this.config = config;
-    this.hiveConf = createHiveConf(config.getHiveConfigOverride());
+    this.hiveConf = HiveUtilities.generateHiveConf(config.getConfigProps());
     this.schemaFactory = new HiveSchemaFactory(this, name, hiveConf);
   }
 
@@ -76,7 +82,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+  }
+
+  @Override
   public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
     HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
     try {
       if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -84,7 +100,26 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
             "Querying views created in Hive from Drill is not supported in current version.");
       }
 
-      return new HiveScan(userName, hiveReadEntry, this, columns, null);
+      Map<String, String> confProperties = new HashMap<>();
+      if (options != null) {
+        String value = StringEscapeUtils.unescapeJava(options.getString(ExecConstants.HIVE_CONF_PROPERTIES));
+        logger.trace("[{}] is set to {}.", ExecConstants.HIVE_CONF_PROPERTIES, value);
+        try {
+          Properties properties = new Properties();
+          properties.load(new StringReader(value));
+          confProperties =
+            properties.stringPropertyNames().stream()
+              .collect(
+                Collectors.toMap(
+                  Function.identity(),
+                  properties::getProperty,
+                  (o, n) -> n));
+          } catch (IOException e) {
+            logger.warn("Unable to parse Hive conf properties {}, ignoring them.", value);
+        }
+      }
+
+      return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
@@ -181,14 +216,4 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
     return ruleBuilder.build();
   }
 
-  private static HiveConf createHiveConf(final Map<String, String> hiveConfigOverride) {
-    final HiveConf hiveConf = new HiveConf();
-    for(Entry<String, String> config : hiveConfigOverride.entrySet()) {
-      final String key = config.getKey();
-      final String value = config.getValue();
-      hiveConf.set(key, value);
-      logger.trace("HiveConfig Override {}={}", key, value);
-    }
-    return hiveConf;
-  }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index b6f15c8..d812468 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -19,28 +19,31 @@ package org.apache.drill.exec.store.hive;
 
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonAlias;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(HiveStoragePluginConfig.NAME)
 public class HiveStoragePluginConfig extends StoragePluginConfigBase {
-  @JsonProperty
-  public Map<String, String> configProps;
 
   public static final String NAME = "hive";
 
-  @JsonIgnore
-  public Map<String, String> getHiveConfigOverride() {
-    return configProps;
-  }
+  private final Map<String, String> configProps;
 
   @JsonCreator
-  public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
-    this.configProps = props;
+  public HiveStoragePluginConfig(@JsonProperty("configProps")
+                                 // previously two names were allowed due to incorrectly written ser / der logic
+                                 // allowing to use both during deserialization for backward compatibility
+                                 @JsonAlias("config") Map<String, String> configProps) {
+    this.configProps = configProps;
+  }
+
+  @JsonProperty
+  public Map<String, String> getConfigProps() {
+    return configProps;
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 8ca8647..0acec2d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.google.common.collect.ImmutableSet;
@@ -55,6 +56,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   private final HiveTableWithColumnCache table;
   private final List<HivePartition> partitions;
   private final List<SchemaPath> columns;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@@ -63,22 +65,24 @@ public class HiveSubScan extends AbstractBase implements SubScan {
                      @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
-                     @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig)
+                     @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
+                     @JsonProperty("confProperties") Map<String, String> confProperties)
       throws IOException, ExecutionSetupException, ReflectiveOperationException {
     this(userName,
         splits,
         hiveReadEntry,
         splitClasses,
         columns,
-        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig));
+        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties);
   }
 
   public HiveSubScan(final String userName,
                      final List<List<String>> splits,
                      final HiveReadEntry hiveReadEntry,
-                      final List<String> splitClasses,
+                     final List<String> splitClasses,
                      final List<SchemaPath> columns,
-                     final HiveStoragePlugin hiveStoragePlugin)
+                     final HiveStoragePlugin hiveStoragePlugin,
+                     final Map<String, String> confProperties)
     throws IOException, ReflectiveOperationException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
@@ -88,6 +92,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     this.splitClasses = splitClasses;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
 
     for (int i = 0; i < splits.size(); i++) {
       inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i)));
@@ -119,6 +124,11 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return hiveStoragePlugin.getConfig();
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveTableWithColumnCache getTable() {
     return table;
@@ -141,7 +151,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
 
   @JsonIgnore
   public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties);
   }
 
   @Override
@@ -152,7 +162,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     try {
-      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c8efb65..6fc567e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -93,7 +93,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_
 public class HiveUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
 
-  /** Partition value is received in string format. Convert it into appropriate object based on the type. */
+  /**
+   * Partition value is received in string format. Convert it into appropriate object based on the type.
+   *
+   * @param typeInfo type info
+   * @param value partition values
+   * @param defaultPartitionValue default partition value
+   * @return converted object
+   */
   public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
     if (typeInfo.getCategory() != Category.PRIMITIVE) {
       // In Hive only primitive types are allowed as partition column types.
@@ -147,6 +154,15 @@ public class HiveUtilities {
     return null;
   }
 
+  /**
+   * Populates vector with given value based on its type.
+   *
+   * @param vector vector instance
+   * @param managedBuffer Drill duffer
+   * @param val value
+   * @param start start position
+   * @param end end position
+   */
   public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val,
       final int start, final int end) {
     TypeProtos.MinorType type = vector.getField().getType().getMinorType();
@@ -307,6 +323,13 @@ public class HiveUtilities {
     }
   }
 
+  /**
+   * Obtains major type from given type info holder.
+   *
+   * @param typeInfo type info holder
+   * @param options session options
+   * @return appropriate major type, null otherwise. For some types may throw unsupported exception.
+   */
   public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE: {
@@ -343,8 +366,14 @@ public class HiveUtilities {
     return null;
   }
 
-  public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
-                                                                           OptionSet options) {
+  /**
+   * Obtains minor type from given primitive type info holder.
+   *
+   * @param primitiveTypeInfo primitive type info holder
+   * @param options session options
+   * @return appropriate minor type, otherwise throws unsupported type exception
+   */
+  public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, OptionSet options) {
     switch(primitiveTypeInfo.getPrimitiveCategory()) {
       case BINARY:
         return TypeProtos.MinorType.VARBINARY;
@@ -392,10 +421,8 @@ public class HiveUtilities {
    * @param job {@link JobConf} instance needed incase the table is StorageHandler based table.
    * @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables).
    * @param table Table object
-   * @throws Exception
    */
-  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd,
-      final Table table) throws Exception {
+  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, final Table table) throws Exception {
     final String inputFormatName = sd.getInputFormat();
     if (Strings.isNullOrEmpty(inputFormatName)) {
       final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
@@ -426,26 +453,23 @@ public class HiveUtilities {
   }
 
   /**
-   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
-   * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
+   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)}
+   * which also adds parameters from table to properties returned by that method.
    *
    * @param partition the source of partition level parameters
    * @param table     the source of table level parameters
    * @return properties
    */
   public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
-    final Properties properties;
     restoreColumns(table, partition);
-    properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+    Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
 
     // SerDe expects properties from Table, but above call doesn't add Table properties.
     // Include Table properties in final list in order to not to break SerDes that depend on
     // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
-    for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
-      if (entry.getKey() != null && entry.getKey() != null) {
-        properties.put(entry.getKey(), entry.getValue());
-      }
-    }
+    table.getParameters().entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> properties.put(e.getKey(), e.getValue()));
 
     return properties;
   }
@@ -453,8 +477,8 @@ public class HiveUtilities {
   /**
    * Sets columns from table cache to table and partition.
    *
+   * @param table the source of column lists cache
    * @param partition partition which will set column list
-   * @param table     the source of column lists cache
    */
   public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
     // exactly the same column lists for partitions or table
@@ -471,6 +495,9 @@ public class HiveUtilities {
    * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
    * which also sets columns from table cache to table and returns properties returned by
    * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
+   *
+   * @param table Hive table with cached columns
+   * @return Hive table metadata
    */
   public static Properties getTableMetadata(HiveTableWithColumnCache table) {
     restoreColumns(table, null);
@@ -478,13 +505,18 @@ public class HiveUtilities {
       table.getDbName(), table.getTableName(), table.getPartitionKeys());
   }
 
+  /**
+   * Generates unsupported types exception message with list of supported types
+   * and throws user exception.
+   *
+   * @param unsupportedType unsupported type
+   */
   public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
-    StringBuilder errMsg = new StringBuilder();
-    errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));
-    errMsg.append(System.getProperty("line.separator"));
-    errMsg.append("Following Hive data types are supported in Drill for querying: ");
-    errMsg.append(
-        "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
+    StringBuilder errMsg = new StringBuilder()
+        .append("Unsupported Hive data type ").append(unsupportedType).append(". ")
+        .append(System.lineSeparator())
+        .append("Following Hive data types are supported in Drill for querying: ")
+        .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
 
     throw UserException.unsupportedError()
         .message(errMsg.toString())
@@ -633,7 +665,7 @@ public class HiveUtilities {
   }
 
   /**
-   * Get the input format from given {@link StorageDescriptor}
+   * Get the input format from given {@link StorageDescriptor}.
    *
    * @param properties table properties
    * @param hiveReadEntry hive read entry
@@ -681,5 +713,35 @@ public class HiveUtilities {
     }
     return false;
   }
+
+  /**
+   * Creates HiveConf based on given list of configuration properties.
+   *
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(Map<String, String> properties) {
+    logger.trace("Override HiveConf with the following properties {}", properties);
+    HiveConf hiveConf = new HiveConf();
+    properties.forEach(hiveConf::set);
+    return hiveConf;
+  }
+
+  /**
+   * Creates HiveConf based on properties in given HiveConf and configuration properties.
+   *
+   * @param hiveConf hive conf
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> properties) {
+    Properties changedProperties = hiveConf.getChangedProperties();
+    changedProperties.putAll(properties);
+    HiveConf newHiveConf = new HiveConf();
+    changedProperties.stringPropertyNames()
+        .forEach(name -> newHiveConf.set(name, changedProperties.getProperty(name)));
+    return newHiveConf;
+  }
+
 }
 
diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
index d06220f..018189c 100644
--- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,7 @@
   "storage":{
     hive : {
       type:"hive",
-      config : {
+      configProps : {
         "hive.metastore.uris" : "",
         "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true",
         "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 556deb2..ea8d5df 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -152,6 +152,12 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
     // checks only group scan
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
   }
 
   @Test
@@ -243,4 +249,15 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
     testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
   }
 
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 25393e7..94f39b8 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -412,6 +412,27 @@ public class TestHiveStorage extends HiveTestBase {
   public void testPhysicalPlanSubmission() throws Exception {
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.readtest");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("IOException: Not a file"));
+    test(query);
   }
 
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 80da976..c5c0d48 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -50,6 +50,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("hive.default", "partition_with_few_schemas")
         .baselineValues("hive.default", "kv_native")
         .baselineValues("hive.default", "kv_native_ext")
+        .baselineValues("hive.default", "sub_dir_table")
         .go();
 
     testBuilder()
@@ -254,6 +255,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
+        .baselineValues("DRILL", "hive.default", "sub_dir_table", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index f206999..074cb3b 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -25,6 +25,7 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.serde.serdeConstants;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -80,7 +80,7 @@ public class HiveTestDataGenerator {
     this.whDir = whDir;
     this.dirTestWatcher = dirTestWatcher;
 
-    config = Maps.newHashMap();
+    config = new HashMap<>();
     config.put(ConfVars.METASTOREURIS.toString(), "");
     config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
     config.put("hive.metastore.warehouse.dir", whDir);
@@ -89,7 +89,9 @@ public class HiveTestDataGenerator {
 
   /**
    * Add Hive test storage plugin to the given plugin registry.
-   * @throws Exception
+   *
+   * @param pluginRegistry storage plugin registry
+   * @throws Exception in case if unable to update Hive storage plugin
    */
   public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) throws Exception {
     HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
@@ -101,7 +103,8 @@ public class HiveTestDataGenerator {
   /**
    * Update the current HiveStoragePlugin in given plugin registry with given <i>configOverride</i>.
    *
-   * @param configOverride
+   * @param pluginRegistry storage plugin registry
+   * @param configOverride config properties to be overridden
    * @throws DrillException if fails to update or no Hive plugin currently exists in given plugin registry.
    */
   public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, Map<String, String> configOverride)
@@ -113,7 +116,7 @@ public class HiveTestDataGenerator {
     }
 
     HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
-    newPluginConfig.getHiveConfigOverride().putAll(configOverride);
+    newPluginConfig.getConfigProps().putAll(configOverride);
 
     pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true);
   }
@@ -344,7 +347,7 @@ public class HiveTestDataGenerator {
         "charType CHAR(10))"
     );
 
-    /**
+    /*
      * Create a PARQUET table with all supported types.
      */
     executeQuery(hiveDriver,
@@ -542,6 +545,8 @@ public class HiveTestDataGenerator {
 
     createTestDataForDrillNativeParquetReaderTests(hiveDriver);
 
+    createSubDirTable(hiveDriver, testDataFile);
+
     ss.close();
   }
 
@@ -594,56 +599,61 @@ public class HiveTestDataGenerator {
       "location '%s'", thirdPartition));
   }
 
+  private void createSubDirTable(Driver hiveDriver, String testDataFile) {
+    String tableName = "sub_dir_table";
+    dirTestWatcher.copyResourceToRoot(Paths.get(testDataFile), Paths.get(tableName, "sub_dir", "data.txt"));
+
+    String tableLocation = Paths.get(dirTestWatcher.getRootDir().toURI().getPath(), tableName).toUri().getPath();
+
+    String tableDDL = String.format("create external table sub_dir_table (key int, value string) " +
+        "row format delimited fields terminated by ',' stored as textfile location '%s'", tableLocation);
+    executeQuery(hiveDriver, tableDDL);
+  }
+
   private File getTempFile() throws Exception {
     return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile();
   }
 
   private String generateTestDataFile() throws Exception {
-    final File file = getTempFile();
-    PrintWriter printWriter = new PrintWriter(file);
-    for (int i=1; i<=5; i++) {
-      printWriter.println (String.format("%d, key_%d", i, i));
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      for (int i = 1; i <= 5; i++) {
+        printWriter.println(String.format("%d, key_%d", i, i));
+      }
     }
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateTestDataFileForPartitionInput() throws Exception {
-    final File file = getTempFile();
-
-    PrintWriter printWriter = new PrintWriter(file);
-
-    String partValues[] = {"1", "2", "null"};
-
-    for(int c = 0; c < partValues.length; c++) {
-      for(int d = 0; d < partValues.length; d++) {
-        for(int e = 0; e < partValues.length; e++) {
-          for (int i = 1; i <= 5; i++) {
-            Date date = new Date(System.currentTimeMillis());
-            Timestamp ts = new Timestamp(System.currentTimeMillis());
-            printWriter.printf("%s,%s,%s,%s,%s",
-                date.toString(), ts.toString(), partValues[c], partValues[d], partValues[e]);
-            printWriter.println();
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      String partValues[] = {"1", "2", "null"};
+      for (String partValue : partValues) {
+        for (String partValue1 : partValues) {
+          for (String partValue2 : partValues) {
+            for (int i = 1; i <= 5; i++) {
+              Date date = new Date(System.currentTimeMillis());
+              Timestamp ts = new Timestamp(System.currentTimeMillis());
+              printWriter.printf("%s,%s,%s,%s,%s", date.toString(), ts.toString(), partValue, partValue1, partValue2);
+              printWriter.println();
+            }
           }
         }
       }
     }
 
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateAllTypesDataFile() throws Exception {
     File file = getTempFile();
 
-    PrintWriter printWriter = new PrintWriter(file);
-    printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689," +
-        "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield," +
-        "2013-07-05 17:01:00,2013-07-05,charfield");
-    printWriter.println(",,,,,,,,,,,,,,,,");
-    printWriter.close();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"+
+          "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"+
+          "2013-07-05 17:01:00,2013-07-05,charfield");
+      printWriter.println(",,,,,,,,,,,,,,,,");
+    }
 
     return file.getPath();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 49f149b..4c840a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -410,6 +410,9 @@ public final class ExecConstants {
   public static final OptionValidator HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR =
       new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
 
+  public static final String HIVE_CONF_PROPERTIES = "store.hive.conf.properties";
+  public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new StringValidator(HIVE_CONF_PROPERTIES);
+
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final long SLICE_TARGET_DEFAULT = 100000l;
   public static final PositiveLongValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 36e74a0..e7518b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.opt;
 
-import com.google.common.collect.Lists;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -28,9 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
 import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.logical.data.Project;
@@ -53,6 +49,7 @@ import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -64,6 +61,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class BasicOptimizer extends Optimizer {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -99,8 +97,7 @@ public class BasicOptimizer extends Optimizer {
         .version(logicalProperties.version)
         .generator(logicalProperties.generator)
         .options(new JSONOptions(context.getOptions().getOptionList())).build();
-    final PhysicalPlan p = new PhysicalPlan(props, physOps);
-    return p;
+    return new PhysicalPlan(props, physOps);
   }
 
   public static class BasicOptimizationContext implements OptimizationContext {
@@ -128,30 +125,28 @@ public class BasicOptimizer extends Optimizer {
      */
     private final LogicalPlan logicalPlan;
 
-    public LogicalConverter(final LogicalPlan logicalPlan) {
+    LogicalConverter(final LogicalPlan logicalPlan) {
       this.logicalPlan = logicalPlan;
     }
 
     @Override
     public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-      final List<Ordering> orderDefs = Lists.newArrayList();
       PhysicalOperator input = groupBy.getInput().accept(this, value);
 
       if (groupBy.getKeys().size() > 0) {
-        for(NamedExpression e : groupBy.getKeys()) {
-          orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
-        }
+        List<Ordering> orderDefs = groupBy.getKeys().stream()
+            .map(e -> new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST))
+            .collect(Collectors.toList());
         input = new Sort(input, orderDefs, false);
       }
 
-      final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
-      return sa;
+      return new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
     }
 
     @Override
     public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException {
       PhysicalOperator input = window.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
+      final List<Ordering> ods = new ArrayList<>();
 
       input = new Sort(input, ods, false);
 
@@ -162,11 +157,7 @@ public class BasicOptimizer extends Optimizer {
     @Override
     public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException {
       final PhysicalOperator input = order.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
-      for (Ordering o : order.getOrderings()){
-        ods.add(o);
-      }
-
+      final List<Ordering> ods = new ArrayList<>(order.getOrderings());
       return new SelectionVectorRemover(new Sort(input, ods, false));
     }
 
@@ -180,18 +171,20 @@ public class BasicOptimizer extends Optimizer {
     @Override
     public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException {
       PhysicalOperator leftOp = join.getLeft().accept(this, value);
-      final List<Ordering> leftOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
-      }
+
+      List<Ordering> leftOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getLeft()))
+          .collect(Collectors.toList());
+
       leftOp = new Sort(leftOp, leftOrderDefs, false);
       leftOp = new SelectionVectorRemover(leftOp);
 
       PhysicalOperator rightOp = join.getRight().accept(this, value);
-      final List<Ordering> rightOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
-      }
+
+      List<Ordering> rightOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getRight()))
+          .collect(Collectors.toList());
+
       rightOp = new Sort(rightOp, rightOrderDefs, false);
       rightOp = new SelectionVectorRemover(rightOp);
 
@@ -210,7 +203,7 @@ public class BasicOptimizer extends Optimizer {
       try {
         final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
         final String user = userSession.getSession().getCredentials().getUserName();
-        return storagePlugin.getPhysicalScan(user, scan.getSelection());
+        return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }
@@ -241,8 +234,8 @@ public class BasicOptimizer extends Optimizer {
     }
 
     @Override
-    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) throws OptimizerException {
-      return new org.apache.drill.exec.physical.config.UnnestPOP(null, unnest.getColumn());
+    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) {
+      return new UnnestPOP(null, unnest.getColumn());
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 4ee7671..53036f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,6 +46,7 @@ public abstract class DrillTable implements Table {
   private final StoragePlugin plugin;
   private final String userName;
   private GroupScan scan;
+  private SessionOptionManager options;
 
   /**
    * Creates a DrillTable instance for a @{code TableType#Table} table.
@@ -85,12 +87,16 @@ public abstract class DrillTable implements Table {
     this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
   }
 
+  public void setOptions(SessionOptionManager options) {
+    this.options = options;
+  }
+
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
       if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
         this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot());
       } else {
-        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
+        this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options);
       }
     }
     return scan;
@@ -138,7 +144,8 @@ public abstract class DrillTable implements Table {
     return true;
   }
 
-  @Override public boolean isRolledUp(String column) {
+  @Override
+  public boolean isRolledUp(String column) {
     return false;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3f65ad2..d6b0951 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -78,6 +78,7 @@ import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -113,7 +114,6 @@ public class SqlConverter {
   private final DrillConfig drillConfig;
   private RelOptCluster cluster;
 
-  private String sql;
   private VolcanoPlanner planner;
   private boolean useRootSchema = false;
 
@@ -187,12 +187,10 @@ public class SqlConverter {
 
   public SqlNode validate(final SqlNode parsedNode) {
     try {
-      SqlNode validatedNode = validator.validate(parsedNode);
-      return validatedNode;
+      return validator.validate(parsedNode);
     } catch (RuntimeException e) {
       UserException.Builder builder = UserException
-          .validationError(e)
-          .addContext("SQL Query", sql);
+          .validationError(e);
       if (isInnerQuery) {
         builder.message("Failure validating a view your query is dependent upon.");
       }
@@ -240,7 +238,7 @@ public class SqlConverter {
 
   private class DrillValidator extends SqlValidatorImpl {
 
-    protected DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
+    DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
         RelDataTypeFactory typeFactory, SqlConformance conformance) {
       super(opTab, catalogReader, typeFactory, conformance);
     }
@@ -382,15 +380,11 @@ public class SqlConverter {
 
     //To avoid unexpected column errors set a value of top to false
     final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false);
-    final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
-    return rel2;
+    return rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
   }
 
   private class Expander implements RelOptTable.ViewExpander {
 
-    public Expander() {
-    }
-
     @Override
     public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) {
       final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
@@ -550,7 +544,7 @@ public class SqlConverter {
      * also preserving nullability.
      *
      * <p>Tries to expand the cast, and therefore the result may be something
-     * other than a {@link RexCall} to the CAST operator, such as a
+     * other than a {@link org.apache.calcite.rex.RexCall} to the CAST operator, such as a
      * {@link RexLiteral} if {@code matchNullability} is false.
      *
      * @param type             Type to cast to
@@ -611,7 +605,7 @@ public class SqlConverter {
     /**
      * Disallow temporary tables presence in sql statement (ex: in view definitions)
      */
-    public void disallowTemporaryTables() {
+    void disallowTemporaryTables() {
       this.allowTemporaryTables = false;
     }
 
@@ -647,13 +641,19 @@ public class SqlConverter {
         }
       }
 
-      return super.getTable(names);
+      Prepare.PreparingTable table = super.getTable(names);
+      DrillTable unwrap;
+      // add session options if found table is Drill table
+      if (table != null && (unwrap = table.unwrap(DrillTable.class)) != null) {
+        unwrap.setOptions(session.getOptions());
+      }
+      return table;
     }
 
     @Override
     public List<List<String>> getSchemaPaths() {
       if (useRootSchema) {
-        return ImmutableList.<List<String>>of(ImmutableList.<String>of());
+        return ImmutableList.of(ImmutableList.of());
       }
       return super.getSchemaPaths();
     }
@@ -662,8 +662,8 @@ public class SqlConverter {
      * check if the schema provided is a valid schema:
      * <li>schema is not indicated (only one element in the names list)<li/>
      *
-     * @param names             list of schema and table names, table name is always the last element
-     * @return throws a userexception if the schema is not valid.
+     * @param names list of schema and table names, table name is always the last element
+     * @throws UserException if the schema is not valid.
      */
     private void isValidSchema(final List<String> names) throws UserException {
       SchemaPlus defaultSchema = session.getDefaultSchema(this.rootSchema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a9c4742..a16bb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.server.options;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.DrillConfig;
@@ -40,15 +43,13 @@ import org.apache.drill.exec.util.AssertionUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
- * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
- * persist between restarts.
- */
 
 /**
+ *  <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
+ *  Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
+ *  persist between restarts.
+ *  </p>
+ *
  *  <p> All the system options are externalized into conf file. While adding a new system option
  *  a validator should be added and the default value for the option should be set in
  *  the conf files(example : drill-module.conf) under the namespace drill.exec.options.
@@ -173,6 +174,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR),
       new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
       new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
       new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL),
@@ -237,11 +239,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
-    final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap();
+    CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
+      .collect(Collectors.toMap(
+        d -> d.getValidator().getOptionName(),
+        Function.identity(),
+        (o, n) -> n,
+        CaseInsensitiveMap::newHashMap));
 
-    for (final OptionDefinition definition: definitions) {
-      map.put(definition.getValidator().getOptionName(), definition);
-    }
 
     if (AssertionUtil.isAssertionsEnabled()) {
       map.put(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, new OptionDefinition(ExecConstants.DRILLBIT_CONTROLS_VALIDATOR));
@@ -295,7 +299,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
    * Initializes this option manager.
    *
    * @return this option manager
-   * @throws Exception
+   * @throws Exception if unable to initialize option manager
    */
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
@@ -395,16 +399,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
 
   @Override
   public void deleteAllLocalOptions() {
-    final Set<String> names = Sets.newHashSet();
-    for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) {
-      names.add(entry.getKey());
-    }
-    for (final String name : names) {
-      options.delete(name); // should be lowercase
-    }
+    Iterable<Map.Entry<String, PersistedOptionValue>> allOptions = () -> options.getAll();
+    StreamSupport.stream(allOptions.spliterator(), false)
+      .map(Entry::getKey)
+      .forEach(name -> options.delete(name)); // should be lowercase
   }
 
-  public static CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
+  private CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) {
     // populate the options from the config
     final Map<String, OptionValue> defaults = new HashMap<>();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index de2b2c3..d37a0a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.PlannerPhase;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Abstract class for StorePlugin implementations.
  * See StoragePlugin for description of the interface intent and its methods.
@@ -102,12 +103,23 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     }
   }
 
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection);
+  }
+
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
 
   @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, columns);
+  }
+
+  @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 50f2731..2617065 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Interface for all implementations of the storage plugins. Different implementations of the storage
  * formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -36,18 +37,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
 
   /** Indicates if Drill can read the table from this format.
   */
-  public boolean supportsRead();
+  boolean supportsRead();
 
   /** Indicates if Drill can write a table to this format (e.g. as JSON, csv, etc.).
    */
-  public boolean supportsWrite();
+  boolean supportsWrite();
 
   /** An implementation of this method will return one or more specialized rules that Drill query
    *  optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set.
    * @return an empty set or a set of plugin specific physical optimizer rules.
    */
   @Deprecated
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
+  Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -55,9 +56,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * @param userName User whom to impersonate when when reading the contents as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @return The physical scan operator for the particular GroupScan (read) node.
-   * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
+
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException;
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -66,18 +76,29 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data source.
    * @return The physical scan operator for the particular GroupScan (read) node.
-   * @throws IOException
   */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException;
 
-  /** Method returns a Jackson serializable object that extends a StoragePluginConfig
-  * @return an extension of StoragePluginConfig
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data source.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException;
+
+  /**
+   * Method returns a Jackson serializable object that extends a StoragePluginConfig.
+   *
+   * @return an extension of StoragePluginConfig
   */
-  public StoragePluginConfig getConfig();
+  StoragePluginConfig getConfig();
 
   /**
    * Initialize the storage plugin. The storage plugin will not be used until this method is called.
    */
-  public void start() throws IOException;
+  void start() throws IOException;
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4b1e9dd..b0cc209 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -555,6 +555,10 @@ drill.exec.options: {
     store.hive.optimize_scan_with_native_readers: false,
     store.hive.parquet.optimize_scan_with_native_reader: false,
     store.hive.maprdb_json.optimize_scan_with_native_reader: false,
+    # Properties values should NOT be set in double-quotes or any other quotes.
+    # Property name and value should be separated by =.
+    # Properties should be separated by new line (\n).
+    store.hive.conf.properties: "",
     store.json.all_text_mode: false,
     store.json.writer.allow_nan_inf: true,
     store.json.reader.allow_nan_inf: true,


[drill] 02/04: DRILL-6519: Add String Distance and Phonetic Functions

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 2ea603f8c8396015006885365921027e5b3e7392
Author: Charles S. Givre <cg...@gmail.com>
AuthorDate: Thu Jul 5 11:45:52 2018 -0400

    DRILL-6519: Add String Distance and Phonetic Functions
    
    closes #1331
---
 exec/java-exec/pom.xml                             |   6 +-
 .../drill/exec/expr/fn/impl/PhoneticFunctions.java | 407 +++++++++++++++++++++
 .../exec/expr/fn/impl/StringDistanceFunctions.java | 329 +++++++++++++++++
 .../drill/exec/fn/impl/TestPhoneticFunctions.java  | 119 ++++++
 .../exec/fn/impl/TestStringDistanceFunctions.java  |  80 ++++
 .../java/org/apache/drill/test/QueryBuilder.java   |  21 ++
 exec/jdbc-all/pom.xml                              |   2 +-
 7 files changed, 962 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 6d57ba3..3e1a118 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -97,7 +97,11 @@
       <artifactId>univocity-parsers</artifactId>
       <version>1.3.0</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-text</artifactId>
+      <version>1.4</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/PhoneticFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/PhoneticFunctions.java
new file mode 100644
index 0000000..ee26bd3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/PhoneticFunctions.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import javax.inject.Inject;
+
+public class PhoneticFunctions {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhoneticFunctions.class);
+
+  private PhoneticFunctions() {
+  }
+
+  /**
+   * The Caverphone function is a phonetic matching function.   This is an algorithm created by the Caversham Project at the University of Otago. It implements the Caverphone 1.0 algorithm.
+   * <p>
+   * <p>
+   * Usage:  SELECT caverphone1( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "caverphone1", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class Caverphone1Function implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.Caverphone1().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * The Caverphone function is a phonetic matching function.   This is an algorithm created by the Caversham Project at the University of Otago. It implements the Caverphone 2.0 algorithm.
+   * <p>
+   * Usage: SELECT caverphone2( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "caverphone2", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class Caverphone2Function implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.Caverphone2().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * Encodes a string into a Cologne Phonetic value.
+   * Implements the Kölner Phonetik (Cologne Phonetic) algorithm issued by Hans Joachim Postel in 1969.
+   * <p>
+   * The Kölner Phonetik is a phonetic algorithm which is optimized for the German language.
+   * It is related to the well-known soundex algorithm.
+   * <p>
+   * Usage:  SELECT cologne_phonetic( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "cologne_phonetic", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class ColognePhoneticFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.ColognePhonetic().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * Encodes a string into a Daitch-Mokotoff Soundex value.
+   * The Daitch-Mokotoff Soundex algorithm is a refinement of the Russel and American Soundex algorithms,
+   * yielding greater accuracy in matching especially Slavish and Yiddish surnames with similar pronunciation
+   * but differences in spelling.
+   * <p>
+   * The main differences compared to the other soundex variants are:
+   * coded names are 6 digits long
+   * the initial character of the name is coded
+   * rules to encoded multi-character n-grams
+   * multiple possible encodings for the same name (branching)
+   * <p>
+   * Usage:  SELECT dm_soundex( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "dm_soundex", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class DaitchMokotoffFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.DaitchMokotoffSoundex().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * Match Rating Approach Phonetic Algorithm Developed by Western Airlines in 1977.
+   * Usage:  SELECT match_rating_encoder( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "match_rating_encoder", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class MatchRatingFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.MatchRatingApproachEncoder().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * The New York State Identification and Intelligence System Phonetic Code, commonly known as NYSIIS, is a phonetic algorithm devised in 1970 as part of the New York State Identification and Intelligence System (now a part of the New York State Division of Criminal Justice Services). It features an accuracy increase of 2.7% over the traditional Soundex algorithm.
+   * Encodes a string into a NYSIIS value. NYSIIS is an encoding used to relate similar names, but can also be used as a general purpose scheme to find word with similar phonemes.
+   * <p>
+   * Usage: SELECT nysiis(string) FROM...
+   */
+
+  @FunctionTemplate(name = "nysiis", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class NYSIISFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.Nysiis().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+  }
+
+  /**
+   * Encodes a string into a Refined Soundex value. Soundex is an encoding used to relate similar names, but can also be used as a general purpose scheme to find word with similar phonemes.
+   * <p>
+   * Usage:  SELECT refined_soundex( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "refined_soundex", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class RefinedSoundexFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.RefinedSoundex().encode(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * Encodes a string into a Soundex value. Soundex is an encoding used to relate similar names, but can also be used as a general purpose scheme to find word with similar phonemes.
+   * <p>
+   * Usage:  SELECT soundex( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "soundex", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class SoundexFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.Soundex().soundex(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+  }
+
+  /**
+   * Implements the Metaphone phonetic algorithm (https://en.wikipedia.org/wiki/Metaphone),
+   * and calculates a given string's Metaphone value.
+   * <p>
+   * Usage: SELECT metaphone( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "metaphone", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class MetaphoneFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.Metaphone().metaphone(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+
+  }
+
+  /**
+   * Implements the Double Metaphone phonetic algorithm (https://en.wikipedia.org/wiki/Metaphone),
+   * and calculates a given string's Double Metaphone value.
+   * <p>
+   * Usage: SELECT double_metaphone( string ) FROM...
+   */
+
+  @FunctionTemplate(name = "double_metaphone", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class DoubleMetaphoneFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput;
+
+    @Output
+    VarCharHolder out;
+
+    @Inject
+    DrillBuf buffer;
+
+    @Override
+    public void setup() {
+    }
+
+    @Override
+    public void eval() {
+
+      String input = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
+      String outputString = new org.apache.commons.codec.language.DoubleMetaphone().doubleMetaphone(input);
+
+      out.buffer = buffer;
+      out.start = 0;
+      out.end = outputString.getBytes().length;
+      buffer.setBytes(0, outputString.getBytes());
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringDistanceFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringDistanceFunctions.java
new file mode 100644
index 0000000..0b02769
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringDistanceFunctions.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+public class StringDistanceFunctions {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringDistanceFunctions.class);
+
+  private StringDistanceFunctions() {
+  }
+
+  /**
+   * This function calculates the cosine distance between two strings.
+   * Usage:  SELECT cosine_distance( string1, string2 ) AS cosine_distance FROM...
+   */
+
+  @FunctionTemplate(name = "cosine_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class CosineDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Workspace
+    org.apache.commons.text.similarity.CosineDistance d;
+
+    @Output
+    Float8Holder out;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.CosineDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+  /**
+   * This function calculates the cosine distance between two strings.
+   * A matching algorithm that is similar to the searching algorithms implemented in editors such
+   * as Sublime Text, TextMate, Atom and others.
+   * <p>
+   * One point is given for every matched character. Subsequent matches yield two bonus points. A higher score
+   * indicates a higher similarity.
+   * <p>
+   * <p>
+   * Usage:  SELECT fuzzy_score( string1, string2 ) AS fuzzy_score FROM...
+   */
+
+  @FunctionTemplate(name = "fuzzy_score", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class FuzzyScoreFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.FuzzyScore d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.FuzzyScore(java.util.Locale.ENGLISH);
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.fuzzyScore(input1, input2);
+      out.value = result;
+    }
+  }
+
+  /**
+   * The hamming distance between two strings of equal length is the number of
+   * positions at which the corresponding symbols are different.
+   * <p>
+   * For further explanation about the Hamming Distance, take a look at its
+   * Wikipedia page at http://en.wikipedia.org/wiki/Hamming_distance.
+   * <p>
+   * Usage:  SELECT hamming_distance( string1, string2 ) FROM...
+   */
+
+
+  @FunctionTemplate(name = "hamming_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class HammingDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.HammingDistance d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.HammingDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+
+  /**
+   * Measures the Jaccard distance of two sets of character sequence. Jaccard
+   * distance is the dissimilarity between two sets. It is the complementary of
+   * Jaccard similarity.
+   * <p>
+   * For further explanation about Jaccard Distance, refer
+   * https://en.wikipedia.org/wiki/Jaccard_index
+   * <p>
+   * Usage:  SELECT jaccard_distance( string1, string2 ) FROM ...
+   */
+
+
+  @FunctionTemplate(name = "jaccard_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class JaccardDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.JaccardDistance d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.JaccardDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+  /**
+   * A similarity algorithm indicating the percentage of matched characters between two character sequences.
+   * <p>
+   * The Jaro measure is the weighted sum of percentage of matched characters
+   * from each file and transposed characters. Winkler increased this measure
+   * for matching initial characters.
+   * <p>
+   * This implementation is based on the Jaro Winkler similarity algorithm
+   * from https://en.wikipedia.org/wiki/Jaro–Winkler_distance
+   * <p>
+   * Usage: SELECT jaro_distance( string1, string2 ) FROM...
+   */
+
+  @FunctionTemplate(name = "jaro_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class JaroDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.JaroWinklerDistance d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.JaroWinklerDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+  /**
+   * An algorithm for measuring the difference between two character sequences.
+   * <p>
+   * This is the number of changes needed to change one sequence into another,
+   * where each change is a single character modification (deletion, insertion
+   * or substitution).
+   * <p>
+   * Usage: SELECT levenshtein_distance( string1, string2 ) FROM...
+   */
+
+  @FunctionTemplate(name = "levenshtein_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class LevenstheinDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.LevenshteinDistance d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.LevenshteinDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+  /**
+   * The Longest common subsequence algorithm returns the length of the longest subsequence that two strings have in common.
+   * Two strings that are entirely different, return a value of 0, and two strings that return a value of the
+   * commonly shared length implies that the strings are completely the same in value and position.
+   * Note: Generally this algorithm is fairly inefficient, as for length m, n of the input
+   * CharSequence's left and right respectively, the runtime of the algorithm is O(m*n).
+   * <p>
+   * This implementation is based on the Longest Commons Substring algorithm from https://en.wikipedia.org/wiki/Longest_common_subsequence_problem.
+   * <p>
+   * Usage:  SELECT longest_common_substring_distance( string1, string2 ) FROM...
+   */
+
+  @FunctionTemplate(name = "longest_common_substring_distance", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class LongestCommonSubstringDistanceFunction implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder rawInput1;
+
+    @Param
+    VarCharHolder rawInput2;
+
+    @Output
+    Float8Holder out;
+
+    @Workspace
+    org.apache.commons.text.similarity.LongestCommonSubsequenceDistance d;
+
+    @Override
+    public void setup() {
+      d = new org.apache.commons.text.similarity.LongestCommonSubsequenceDistance();
+    }
+
+    @Override
+    public void eval() {
+
+      String input1 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput1.start, rawInput1.end, rawInput1.buffer);
+      String input2 = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput2.start, rawInput2.end, rawInput2.buffer);
+
+      double result = d.apply(input1, input2);
+      out.value = result;
+    }
+  }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java
new file mode 100644
index 0000000..85bb135
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryResultSet;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({UnlikelyTest.class, SqlFunctionTest.class})
+public class TestPhoneticFunctions extends ClusterTest {
+
+  private QueryResultSet result;
+
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testSoundex() throws Exception {
+    String result = queryBuilder()
+        .sql("select soundex('jaime') as soundex from (values(1))")
+        .singletonString();
+    assertEquals("J500", result);
+  }
+
+  @Test
+  public void testCaverphone1() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT caverphone1('jaime') as caverphone FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("YM1111", result);
+  }
+
+  @Test
+  public void testCaverphone2() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT caverphone2('steve') as caverphone FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("STF1111111", result);
+  }
+
+  @Test
+  public void testCologne() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT cologne_phonetic('steve') AS CP FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("823", result);
+  }
+
+  @Test
+  public void testMatchRatingEncoder() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT match_rating_encoder('Boston') AS MR FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("BSTN", result);
+  }
+
+  @Test
+  public void testNYSIIS() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT nysiis('Boston') AS ny FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("BASTAN", result);
+  }
+
+  @Test
+  public void testRefinedSoundex() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT refined_soundex('Boston') AS rs FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("B103608", result);
+  }
+
+  @Test
+  public void testMetaphone() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT metaphone('Phoenix') AS meta FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("FNKS", result);
+  }
+
+  @Test
+  public void testDoubleMetaphone() throws Exception {
+    String result = queryBuilder()
+        .sql("SELECT double_metaphone('Phoenix') AS meta FROM (VALUES(1))")
+        .singletonString();
+    assertEquals("FNKS", result);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestStringDistanceFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestStringDistanceFunctions.java
new file mode 100644
index 0000000..915c062
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestStringDistanceFunctions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({UnlikelyTest.class, SqlFunctionTest.class})
+public class TestStringDistanceFunctions extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testCosineDistance() throws Exception {
+    double result = queryBuilder()
+        .sql("select cosine_distance( 'Big car', 'red car' ) as distance FROM (VALUES(1))")
+        .singletonDouble();
+    assertEquals(0.5000000000000001, result, 0.0);
+  }
+
+  @Test
+  public void testHammingDistance() throws Exception {
+    double result = queryBuilder()
+        .sql("select hamming_distance( 'Big car', 'red car' ) as distance FROM (VALUES(1))")
+        .singletonDouble();
+    assertEquals(3.0, result, 0.0);
+  }
+
+  @Test
+  public void testJaccardDistance() throws Exception {
+    double result = queryBuilder()
+        .sql("select jaccard_distance( 'Big car', 'red car' ) as distance FROM (VALUES(1))")
+        .singletonDouble();
+    assertEquals(0.56, result, 0.0);
+  }
+
+  @Test
+  public void testJaroDistance() throws Exception {
+    double result = queryBuilder()
+        .sql("select jaro_distance( 'Big car', 'red car' ) as distance FROM (VALUES(1))")
+        .singletonDouble();
+    assertEquals(0.7142857142857143, result, 0.0);
+  }
+
+  @Test
+  public void testLevenshteinDistance() throws Exception {
+    double result = queryBuilder()
+        .sql("select levenshtein_distance( 'Big car', 'red car' ) as distance FROM (VALUES(1))")
+        .singletonDouble();
+    assertEquals(3.0, result, 0.0);
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 0f86955..ff0e166 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -393,6 +393,27 @@ public class QueryBuilder {
 
   /**
    * Run the query that is expected to return (at least) one row
+   * with the only (or first) column returning a double value.
+   * The double value cannot be null.
+   *
+   * @return the value of the first column of the first row
+   * @throws RpcException if anything goes wrong
+   */
+
+  public double singletonDouble() throws RpcException {
+    RowSet rowSet = rowSet();
+    if (rowSet == null) {
+      throw new IllegalStateException("No rows returned");
+    }
+    RowSetReader reader = rowSet.reader();
+    reader.next();
+    double value = reader.scalar(0).getDouble();
+    rowSet.clear();
+    return value;
+  }
+
+  /**
+   * Run the query that is expected to return (at least) one row
    * with the only (or first) column returning a int value.
    * The int value cannot be null.
    *
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index f3595be..f7af511 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -566,7 +566,7 @@
                           This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                         </message>
-                        <maxsize>33000000</maxsize>
+                        <maxsize>34000000</maxsize>
                         <minsize>15000000</minsize>
                         <files>
                           <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>