You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:04 UTC

[drill] branch master updated (72cba88 -> b557b79)

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 72cba88  Update Javadocs.md
     new c230ba5  DRILL-6533: Allow using literal values in functions which expect FieldReader instead of ValueHolder
     new 03f4677  DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical phase
     new 1887cce  DRILL-6947: Fix RuntimeFilter memory leak
     new 4e03d54  DRILL-6999: Fix the case that there's more than one join conditions
     new 780a3fb  DRILL-6962: Function coalesce returns an Error when none of the columns in coalesce exist in a parquet file
     new a00f180  DRILL-6977: Improve Hive tests configuration
     new 5026cd1  DRILL-6985: Fix sqlline.bat issues on Windows and add drill-embedded.bat
     new b557b79  DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/drill/exec/hive/HiveTestBase.java   |  44 ++-
 .../apache/drill/exec/hive/HiveTestFixture.java    | 295 +++++++++++++++++++++
 .../apache/drill/exec/hive/HiveTestUtilities.java  |  36 +++
 .../apache/drill/exec/hive/TestHiveStorage.java    |  34 +--
 .../hive/BaseTestHiveImpersonation.java            |  18 +-
 .../exec/sql/hive/TestViewSupportOnHiveTables.java |  47 ++--
 .../exec/store/hive/HiveTestDataGenerator.java     | 137 ++--------
 distribution/src/assemble/component.xml            |   6 +
 distribution/src/resources/drill-embedded.bat      |  24 ++
 distribution/src/resources/sqlline.bat             |  41 ++-
 exec/java-exec/src/main/codegen/data/Casts.tdd     |  17 ++
 .../{CastFunctions.java => CastUntypedNull.java}   |  44 ++-
 .../expr/fn/interpreter/InterpreterEvaluator.java  |  50 +++-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  44 ++-
 .../exec/physical/impl/join/HashJoinBatch.java     |   8 +-
 .../apache/drill/exec/planner/PlannerPhase.java    |   3 +-
 .../logical/DrillPushProjectIntoScanRule.java      | 111 ++++++--
 .../drill/exec/record/RecordBatchLoader.java       |   6 +
 .../exec/rpc/data/DataServerRequestHandler.java    |  13 +-
 .../drill/exec/work/filter/RuntimeFilterSink.java  |  16 +-
 .../exec/work/filter/RuntimeFilterWritable.java    |   5 +-
 .../java/org/apache/drill/TestJoinNullable.java    |  19 ++
 .../java/org/apache/drill/TestProjectPushDown.java |  20 +-
 .../java/org/apache/drill/TestUntypedNull.java     | 119 +++++++++
 .../drill/exec/expr/fn/impl/TestTypeFns.java       | 133 +++++++++-
 .../drill/exec/fn/impl/TestCastFunctions.java      |  70 +++++
 .../apache/drill/exec/sql/TestBaseViewSupport.java |  38 +++
 .../org/apache/drill/exec/sql/TestViewSupport.java | 106 +++-----
 .../org/apache/drill/jdbc/impl/DrillCursor.java    |   9 +-
 .../codegen/templates/AbstractFieldReader.java     |   4 +-
 .../main/codegen/templates/BasicTypeHelper.java    |  32 ++-
 .../impl => }/UntypedHolderReaderImpl.java         |   5 +-
 .../drill/exec/vector/UntypedNullVector.java       |  18 +-
 .../vector/{complex/impl => }/UntypedReader.java   |   4 +-
 .../{complex/impl => }/UntypedReaderImpl.java      |   5 +-
 .../exec/vector/complex/reader/FieldReader.java    |   6 +-
 36 files changed, 1196 insertions(+), 391 deletions(-)
 create mode 100644 contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
 create mode 100644 distribution/src/resources/drill-embedded.bat
 copy exec/java-exec/src/main/codegen/templates/{CastFunctions.java => CastUntypedNull.java} (65%)
 rename exec/vector/src/main/java/org/apache/drill/exec/vector/{complex/impl => }/UntypedHolderReaderImpl.java (92%)
 rename exec/vector/src/main/java/org/apache/drill/exec/vector/{complex/impl => }/UntypedReader.java (90%)
 rename exec/vector/src/main/java/org/apache/drill/exec/vector/{complex/impl => }/UntypedReaderImpl.java (92%)


[drill] 02/08: DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical phase

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 03f4677667ffed11c3c3ac0e80eb354436bf311e
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Jan 23 01:21:13 2019 +0200

    DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical phase
    
    closes #1619
---
 .../apache/drill/exec/planner/PlannerPhase.java    |   3 +-
 .../logical/DrillPushProjectIntoScanRule.java      | 111 +++++++++++++++++----
 .../java/org/apache/drill/TestProjectPushDown.java |  20 +++-
 3 files changed, 111 insertions(+), 23 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 91d9d43..2d2b073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -447,7 +447,8 @@ public enum PlannerPhase {
             // estimation of filter operator, after filter is pushed down to scan.
 
             ParquetPushDownFilter.getFilterOnProject(optimizerRulesContext),
-            ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext)
+            ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext),
+            DrillPushProjectIntoScanRule.DRILL_PHYSICAL_INSTANCE
         )
         .build();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
index db20cb7..8d0ac84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
@@ -25,10 +25,16 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.util.Utilities;
 
 import java.io.IOException;
@@ -43,51 +49,76 @@ public class DrillPushProjectIntoScanRule extends RelOptRule {
   public static final RelOptRule INSTANCE =
       new DrillPushProjectIntoScanRule(LogicalProject.class,
           EnumerableTableScan.class,
-          "DrillPushProjIntoEnumerableScan");
+          "DrillPushProjIntoEnumerableScan") {
+
+        @Override
+        protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
+          // do not allow skipping conversion of EnumerableTableScan to DrillScanRel if rule is applicable
+          return false;
+        }
+      };
 
   public static final RelOptRule DRILL_LOGICAL_INSTANCE =
       new DrillPushProjectIntoScanRule(LogicalProject.class,
           DrillScanRel.class,
           "DrillPushProjIntoDrillRelScan");
 
+  public static final RelOptRule DRILL_PHYSICAL_INSTANCE =
+      new DrillPushProjectIntoScanRule(ProjectPrel.class,
+          ScanPrel.class,
+          "DrillPushProjIntoScanPrel") {
+
+        @Override
+        protected ScanPrel createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
+          ScanPrel drillScan = (ScanPrel) scan;
+
+          return new ScanPrel(drillScan.getCluster(),
+              drillScan.getTraitSet().plus(Prel.DRILL_PHYSICAL),
+              drillScan.getGroupScan().clone(projectPushInfo.getFields()),
+              projectPushInfo.createNewRowType(drillScan.getCluster().getTypeFactory()),
+              drillScan.getTable());
+        }
+
+        @Override
+        protected ProjectPrel createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
+          return new ProjectPrel(project.getCluster(),
+              project.getTraitSet().plus(Prel.DRILL_PHYSICAL),
+              newScan,
+              newProjects,
+              project.getRowType());
+        }
+      };
+
   private DrillPushProjectIntoScanRule(Class<? extends Project> projectClass, Class<? extends TableScan> scanClass, String description) {
     super(RelOptHelper.some(projectClass, RelOptHelper.any(scanClass)), description);
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final Project project = call.rel(0);
-    final TableScan scan = call.rel(1);
+    Project project = call.rel(0);
+    TableScan scan = call.rel(1);
 
     try {
-
       if (scan.getRowType().getFieldList().isEmpty()) {
         return;
       }
 
       ProjectPushInfo projectPushInfo = DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects());
-      if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)) {
+      if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)
+          || skipScanConversion(projectPushInfo.createNewRowType(project.getCluster().getTypeFactory()), scan)) {
+        // project above scan may be removed in ProjectRemoveRule for the case when it is trivial
         return;
       }
 
-      final DrillScanRel newScan =
-          new DrillScanRel(scan.getCluster(),
-              scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scan.getTable(),
-              projectPushInfo.createNewRowType(project.getInput().getCluster().getTypeFactory()),
-              projectPushInfo.getFields());
+      DrillScanRelBase newScan = createScan(scan, projectPushInfo);
 
       List<RexNode> newProjects = new ArrayList<>();
       for (RexNode n : project.getChildExps()) {
         newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
       }
 
-      final DrillProjectRel newProject =
-          new DrillProjectRel(project.getCluster(),
-              project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              newScan,
-              newProjects,
-              project.getRowType());
+      DrillProjectRelBase newProject =
+          createProject(project, newScan, newProjects);
 
       if (ProjectRemoveRule.isTrivial(newProject)) {
         call.transformTo(newScan);
@@ -100,6 +131,52 @@ public class DrillPushProjectIntoScanRule extends RelOptRule {
   }
 
   /**
+   * Checks whether conversion of input {@code TableScan} instance to target
+   * {@code TableScan} may be omitted.
+   *
+   * @param projectRelDataType project rel data type
+   * @param scan               TableScan to be checked
+   * @return true if specified {@code TableScan} has the same row type as specified.
+   */
+  protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
+    return projectRelDataType.equals(scan.getRowType());
+  }
+
+  /**
+   * Creates new {@code DrillProjectRelBase} instance with specified {@code TableScan newScan} child
+   * and {@code List<RexNode> newProjects} expressions using specified {@code Project project} as prototype.
+   *
+   * @param project     the prototype of resulting project
+   * @param newScan     new project child
+   * @param newProjects new project expressions
+   * @return new project instance
+   */
+  protected DrillProjectRelBase createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
+    return new DrillProjectRel(project.getCluster(),
+        project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        newScan,
+        newProjects,
+        project.getRowType());
+  }
+
+  /**
+   * Creates new {@code DrillScanRelBase} instance with row type and fields list
+   * obtained from specified {@code ProjectPushInfo projectPushInfo}
+   * using specified {@code TableScan scan} as prototype.
+   *
+   * @param scan            the prototype of resulting scan
+   * @param projectPushInfo the source of row type and fields list
+   * @return new scan instance
+   */
+  protected DrillScanRelBase createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
+    return new DrillScanRel(scan.getCluster(),
+        scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        scan.getTable(),
+        projectPushInfo.createNewRowType(scan.getCluster().getTypeFactory()),
+        projectPushInfo.getFields());
+  }
+
+  /**
    * Push project into scan be done only if this is not a star query and
    * table supports project push down.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
index ced105c..f152ba3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -39,7 +39,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testGroupBy() throws Exception {
     String expectedColNames = " \"columns\" : [ \"`marital_status`\" ]";
     testPhysicalPlan(
@@ -48,7 +47,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testOrderBy() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan("select employee_id , full_name, first_name , last_name "
@@ -57,7 +55,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testExprInSelect() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan(
@@ -67,7 +64,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testExprInWhere() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan(
@@ -291,12 +287,26 @@ public class TestProjectPushDown extends PlanTestBase {
     final String query = "SELECT L.L_QUANTITY FROM cp.`tpch/lineitem.parquet` L, cp.`tpch/orders.parquet` O" +
         " WHERE cast(L.L_ORDERKEY as int) = cast(O.O_ORDERKEY as int)";
     final String[] expectedPatterns = {
-        ".*HashJoin.*", "Project.*\\(L_QUANTITY.*CAST\\(\\$0\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
+        ".*HashJoin.*", "Project.*\\(L_QUANTITY\\=\\[\\$0\\].*CAST\\(\\$1\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
     // L_ORDERKEY, O_ORDERKEY should not be present in the projects below the join
     final String[] excludedPatterns = {".*Project\\(L_ORDERKEY=.*", ".*Project\\(O_ORDERKEY=.*"};
     PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
   }
 
+  @Test
+  public void testProjectPushdownAfterFilterRemoving() throws Exception {
+    test("create table dfs.tmp.`nation` as\n" +
+        "select * from cp.`tpch/nation.parquet` where n_regionkey < 10");
+    try {
+      // filter will be removed form the plan
+      String query = "select n_nationkey from dfs.tmp.`nation` where n_regionkey < 10";
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{"columns\\=\\[`n_nationkey`\\]"}, new String[]{"n_regionkey"});
+    } finally {
+      test("drop table if exists dfs.tmp.`nation`");
+    }
+  }
+
   protected void testPushDown(PushDownTestInstance test) throws Exception {
     testPhysicalPlan(test.getSql(), test.getExpected());
   }


[drill] 01/08: DRILL-6533: Allow using literal values in functions which expect FieldReader instead of ValueHolder

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit c230ba55cceb6d48ac9c1ab0701a167d91842a11
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Tue Jan 22 00:18:19 2019 +0200

    DRILL-6533: Allow using literal values in functions which expect FieldReader instead of ValueHolder
    
    closes #1617
---
 .../expr/fn/interpreter/InterpreterEvaluator.java  |  50 +++++---
 .../drill/exec/expr/fn/impl/TestTypeFns.java       | 133 +++++++++++++++++++--
 .../main/codegen/templates/BasicTypeHelper.java    |  30 ++++-
 3 files changed, 187 insertions(+), 26 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
index a0373d9..7648ff4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
+import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.BooleanOperator;
@@ -89,7 +90,17 @@ public class InterpreterEvaluator {
 
   }
 
-  public static ValueHolder evaluateFunction(DrillSimpleFunc interpreter, ValueHolder[] args, String funcName) throws Exception {
+  /**
+   * Assigns specified {@code Object[] args} to the function arguments,
+   * evaluates function and returns its result.
+   *
+   * @param interpreter function to be evaluated
+   * @param args        function arguments
+   * @param funcName    name of the function
+   * @return result of function call stored in {@link ValueHolder}
+   * @throws Exception if {@code args} types does not match function input arguments types
+   */
+  public static ValueHolder evaluateFunction(DrillSimpleFunc interpreter, Object[] args, String funcName) throws Exception {
     Preconditions.checkArgument(interpreter != null, "interpreter could not be null when use interpreted model to evaluate function " + funcName);
 
     // the current input index to assign into the next available parameter, found using the @Param notation
@@ -100,13 +111,13 @@ public class InterpreterEvaluator {
       Field[] fields = interpreter.getClass().getDeclaredFields();
       for (Field f : fields) {
         // if this is annotated as a parameter to the function
-        if ( f.getAnnotation(Param.class) != null ) {
+        if (f.getAnnotation(Param.class) != null) {
           f.setAccessible(true);
           if (currParameterIndex < args.length) {
             f.set(interpreter, args[currParameterIndex]);
           }
           currParameterIndex++;
-        } else if ( f.getAnnotation(Output.class) != null ) {
+        } else if (f.getAnnotation(Output.class) != null) {
           f.setAccessible(true);
           outField = f;
           // create an instance of the holder for the output to be stored in
@@ -127,9 +138,8 @@ public class InterpreterEvaluator {
     }
     interpreter.setup();
     interpreter.eval();
-    ValueHolder out = (ValueHolder) outField.get(interpreter);
 
-    return out;
+    return (ValueHolder) outField.get(interpreter);
   }
 
   private static class InitVisitor extends AbstractExprVisitor<LogicalExpression, VectorAccessible, RuntimeException> {
@@ -307,25 +317,36 @@ public class InterpreterEvaluator {
 
       DrillSimpleFuncHolder holder = (DrillSimpleFuncHolder) holderExpr.getHolder();
 
-      ValueHolder [] args = new ValueHolder [holderExpr.args.size()];
+      // function arguments may have different types:
+      // usually ValueHolder inheritors but sometimes FieldReader ones
+      Object[] args = new Object[holderExpr.args.size()];
       for (int i = 0; i < holderExpr.args.size(); i++) {
-        args[i] = holderExpr.args.get(i).accept(this, inIndex);
+        ValueHolder valueHolder = holderExpr.args.get(i).accept(this, inIndex);
+        Object resultArg = valueHolder;
+        TypeProtos.MajorType argType = TypeHelper.getValueHolderType(valueHolder);
+        TypeProtos.MajorType holderParamType = holder.getParameters()[i].getType();
         // In case function use "NULL_IF_NULL" policy.
         if (holder.getNullHandling() == FunctionTemplate.NullHandling.NULL_IF_NULL) {
           // Case 1: parameter is non-nullable, argument is nullable.
-          if (holder.getParameters()[i].getType().getMode() == TypeProtos.DataMode.REQUIRED && TypeHelper.getValueHolderType(args[i]).getMode() == TypeProtos.DataMode.OPTIONAL) {
+          if (holderParamType.getMode() == TypeProtos.DataMode.REQUIRED
+              && argType.getMode() == TypeProtos.DataMode.OPTIONAL) {
             // Case 1.1 : argument is null, return null value holder directly.
-            if (TypeHelper.isNull(args[i])) {
+            if (TypeHelper.isNull(valueHolder)) {
               return TypeHelper.createValueHolder(holderExpr.getMajorType());
             } else {
               // Case 1.2: argument is nullable but not null value, deNullify it.
-              args[i] = TypeHelper.deNullify(args[i]);
+              resultArg = TypeHelper.deNullify(valueHolder);
             }
-          } else if (holder.getParameters()[i].getType().getMode() == TypeProtos.DataMode.OPTIONAL && TypeHelper.getValueHolderType(args[i]).getMode() == TypeProtos.DataMode.REQUIRED) {
+          } else if (holderParamType.getMode() == TypeProtos.DataMode.OPTIONAL
+              && argType.getMode() == TypeProtos.DataMode.REQUIRED) {
             // Case 2: parameter is nullable, argument is non-nullable. Nullify it.
-            args[i] = TypeHelper.nullify(args[i]);
+            resultArg = TypeHelper.nullify(valueHolder);
           }
         }
+        if (holder.getParameters()[i].isFieldReader()) {
+          resultArg = BasicTypeHelper.getHolderReaderImpl(argType, valueHolder);
+        }
+        args[i] = resultArg;
       }
 
       try {
@@ -333,10 +354,11 @@ public class InterpreterEvaluator {
 
         ValueHolder out = evaluateFunction(interpreter, args, holderExpr.getName());
 
-        if (TypeHelper.getValueHolderType(out).getMode() == TypeProtos.DataMode.OPTIONAL &&
+        TypeProtos.MajorType outputType = TypeHelper.getValueHolderType(out);
+        if (outputType.getMode() == TypeProtos.DataMode.OPTIONAL &&
             holderExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
           return TypeHelper.deNullify(out);
-        } else if (TypeHelper.getValueHolderType(out).getMode() == TypeProtos.DataMode.REQUIRED &&
+        } else if (outputType.getMode() == TypeProtos.DataMode.REQUIRED &&
               holderExpr.getMajorType().getMode() == TypeProtos.DataMode.OPTIONAL) {
           return TypeHelper.nullify(out);
         } else {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
index 02d664f..46e3216 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/impl/TestTypeFns.java
@@ -75,19 +75,19 @@ public class TestTypeFns extends ClusterTest {
     // typeof() returns types using the internal names.
 
     String sql = "SELECT typeof(CAST(a AS " + castType + ")) FROM (VALUES (1)) AS T(a)";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
     assertEquals(resultType, result);
 
     // For typeof(), null values annoyingly report a type of "NULL"
 
     sql = "SELECT typeof(CAST(a AS " + castType + ")) FROM cp.`functions/null.json`";
-    result = client.queryBuilder().sql(sql).singletonString();
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals("NULL", result);
   }
 
   private void doTypeOfTestSpecial(String expr, String value, String resultType) throws RpcException {
     String sql = "SELECT typeof(" + expr + ") FROM (VALUES (" + value + ")) AS T(a)";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
     assertEquals(resultType, result);
   }
 
@@ -124,19 +124,25 @@ public class TestTypeFns extends ClusterTest {
     // sqlTypeOf() returns SQL type names: the names used in CAST.
 
     String sql = "SELECT sqlTypeOf(CAST(a AS " + type + ")) FROM (VALUES (1)) AS T(a)";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
+    assertEquals(type, result);
+
+    // sqlTypeOf() returns SQL type names: the names used in CAST.
+
+    sql = "SELECT sqlTypeOf(CAST(1 AS " + type + "))";
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals(type, result);
 
     // Returns same type even value is null.
 
     sql = "SELECT sqlTypeOf(CAST(a AS " + type + ")) FROM cp.`functions/null.json`";
-    result = client.queryBuilder().sql(sql).singletonString();
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals(type, result);
   }
 
   private void doSqlTypeOfTestSpecial(String expr, String value, String resultType) throws RpcException {
     String sql = "SELECT sqlTypeof(" + expr + ") FROM (VALUES (" + value + ")) AS T(a)";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
     assertEquals(resultType, result);
   }
 
@@ -163,13 +169,17 @@ public class TestTypeFns extends ClusterTest {
     // drillTypeOf() returns types using the internal names.
 
     String sql = "SELECT drillTypeOf(CAST(a AS " + castType + ")) FROM (VALUES (1)) AS T(a)";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
+    assertEquals(resultType, result);
+
+    sql = "SELECT drillTypeOf(CAST(1 AS " + castType + "))";
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals(resultType, result);
 
     // Returns same type even value is null.
 
     sql = "SELECT drillTypeOf(CAST(a AS " + castType + ")) FROM cp.`functions/null.json`";
-    result = client.queryBuilder().sql(sql).singletonString();
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals(resultType, result);
   }
 
@@ -179,19 +189,120 @@ public class TestTypeFns extends ClusterTest {
     // CSV files with headers use REQUIRED mode
 
     String sql = "SELECT modeOf(`name`) FROM cp.`store/text/data/cars.csvh`";
-    String result = client.queryBuilder().sql(sql).singletonString();
+    String result = queryBuilder().sql(sql).singletonString();
     assertEquals("NOT NULL", result);
 
     // CSV files without headers use REPEATED mode
 
     sql = "SELECT modeOf(`columns`) FROM cp.`textinput/input2.csv`";
-    result = client.queryBuilder().sql(sql).singletonString();
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals("ARRAY", result);
 
     // JSON files use OPTIONAL mode
 
     sql = "SELECT modeOf(`name`) FROM cp.`jsoninput/specialchar.json`";
-    result = client.queryBuilder().sql(sql).singletonString();
+    result = queryBuilder().sql(sql).singletonString();
     assertEquals("NULLABLE", result);
   }
+
+  @Test
+  public void testTypeOfLiteral() throws Exception {
+    String sql =
+        "SELECT typeOf(1) c1," +
+              "typeOf('a') c2," +
+              "typeOf(date '2018-01-22') c3," +
+              "typeOf(time '01:00:20.123') c4," +
+              "typeOf(timestamp '2018-01-22 01:00:20.123') c5," +
+              "typeOf(false) c6," +
+              "typeOf(12.3) c7," +
+              "typeOf(1>2) c8," +
+              "typeOf(cast(null as int)) c9";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9")
+        .baselineValues("INT", "VARCHAR", "DATE", "TIME", "TIMESTAMP", "BIT", "VARDECIMAL", "BIT", "NULL")
+        .go();
+  }
+
+  @Test
+  public void testSqlTypeOfLiteral() throws Exception {
+    String sql =
+      "SELECT sqlTypeOf(1) c1," +
+            "sqlTypeOf('a') c2," +
+            "sqlTypeOf(date '2018-01-22') c3," +
+            "sqlTypeOf(time '01:00:20.123') c4," +
+            "sqlTypeOf(timestamp '2018-01-22 01:00:20.123') c5," +
+            "sqlTypeOf(false) c6," +
+            "sqlTypeOf(12.3) c7," +
+            "sqlTypeOf(1>2) c8," +
+            "sqlTypeOf(cast(null as int)) c9";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9")
+        .baselineValues("INTEGER", "CHARACTER VARYING", "DATE", "TIME",
+            "TIMESTAMP", "BOOLEAN", "DECIMAL(3, 1)", "BOOLEAN", "INTEGER")
+        .go();
+  }
+
+  @Test
+  public void testDrillTypeOfLiteral() throws Exception {
+    String sql =
+        "SELECT drillTypeOf(1) c1," +
+              "drillTypeOf('a') c2," +
+              "drillTypeOf(date '2018-01-22') c3," +
+              "drillTypeOf(time '01:00:20.123') c4," +
+              "drillTypeOf(timestamp '2018-01-22 01:00:20.123') c5," +
+              "drillTypeOf(false) c6," +
+              "drillTypeOf(12.3) c7," +
+              "drillTypeOf(1>2) c8," +
+              "drillTypeOf(cast(null as int)) c9";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9")
+        .baselineValues("INT", "VARCHAR", "DATE", "TIME",
+            "TIMESTAMP", "BIT", "VARDECIMAL", "BIT", "INT")
+        .go();
+  }
+
+  @Test
+  public void testModeOfLiteral() throws Exception {
+    String sql =
+        "SELECT modeOf(1) c1," +
+              "modeOf('a') c2," +
+              "modeOf(cast(null as int)) c3," +
+              "modeOf(case when true then null else 'a' end) c4," +
+              "modeOf(case when false then null else 'a' end) c5";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c1", "c2", "c3", "c4", "c5")
+        .baselineValues("NOT NULL", "NOT NULL", "NULLABLE", "NULLABLE", "NULLABLE")
+        .go();
+  }
+
+  @Test
+  public void testCompareTypeLiteral() throws Exception {
+    String sql =
+        "SELECT compareType(1, 2) c1," +
+              "compareType('a', 1) c2," +
+              "compareType(1, 'a') c3," +
+              "compareType(a, '01:00:20.123') c4," +
+              "compareType(3, t.a) c5," +
+              "compareType(t.a, 3) c6\n" +
+        "from (values(1)) t(a)";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c1", "c2", "c3", "c4", "c5", "c6")
+        .baselineValues(0, 1, -1, -1, 0, 0)
+        .go();
+  }
 }
diff --git a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
index 430a41b..383e195 100644
--- a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
+++ b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
@@ -207,7 +207,35 @@ public class BasicTypeHelper {
       throw new UnsupportedOperationException(buildErrorMessage("get writer implementation", type, mode));
   }
 
-  public static Class<?> getHolderReaderImpl( MinorType type, DataMode mode){
+  /**
+   * Creates and returns {@link FieldReader} instance for specified {@code MajorType type} using specisied {@code ValueHolder}
+   *
+   * @param type   type of resulting {@link FieldReader} instance
+   * @param holder value holder for {@link FieldReader} creation
+   * @return {@link FieldReader} instance
+   */
+  public static FieldReader getHolderReaderImpl(MajorType type, ValueHolder holder) {
+    switch (type.getMinorType()) {
+    <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (type.getMode()) {
+          case REQUIRED:
+            return new ${minor.class}HolderReaderImpl((${minor.class}Holder) holder);
+          case OPTIONAL:
+            return new Nullable${minor.class}HolderReaderImpl((Nullable${minor.class}Holder) holder);
+          case REPEATED:
+            return new Repeated${minor.class}HolderReaderImpl((Repeated${minor.class}Holder) holder);
+      }
+      </#list>
+    </#list>
+      case NULL:
+        return new UntypedHolderReaderImpl((UntypedNullHolder) holder);
+    }
+    throw new UnsupportedOperationException(buildErrorMessage("get holder reader implementation", type.getMinorType(), type.getMode()));
+  }
+
+  public static Class<?> getHolderReaderImpl(MinorType type, DataMode mode) {
     switch (type) {      
 <#list vv.types as type>
   <#list type.minor as minor>


[drill] 05/08: DRILL-6962: Function coalesce returns an Error when none of the columns in coalesce exist in a parquet file

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 780a3fbb0222ac037d3c559dc88c7f9dbd48b0cb
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Thu Dec 20 20:58:16 2018 +0200

    DRILL-6962: Function coalesce returns an Error when none of the columns in coalesce exist in a parquet file
    
    - Updated UntypedNullVector to hold value count when vector is allocated and transfered to another one;
    - Updated RecordBatchLoader and DrillCursor to handle case when only UntypedNull values are present in RecordBatch (special case when data buffer is null but actual values are present);
    - Added functions to cast UntypedNull value to other types for use in UDFs;
    - Moved UntypedReader, UntypedHolderReaderImpl and UntypedReaderImpl from org.apache.drill.exec.vector.complex.impl to org.apache.drill.exec.vector package.
    
    closes #1614
---
 exec/java-exec/src/main/codegen/data/Casts.tdd     |  17 +++
 .../main/codegen/templates/CastUntypedNull.java    |  61 +++++++++++
 .../drill/exec/record/RecordBatchLoader.java       |   6 ++
 .../java/org/apache/drill/TestJoinNullable.java    |  19 ++++
 .../java/org/apache/drill/TestUntypedNull.java     | 119 +++++++++++++++++++++
 .../drill/exec/fn/impl/TestCastFunctions.java      |  70 ++++++++++++
 .../org/apache/drill/jdbc/impl/DrillCursor.java    |   9 +-
 .../codegen/templates/AbstractFieldReader.java     |   4 +-
 .../main/codegen/templates/BasicTypeHelper.java    |   2 +
 .../impl => }/UntypedHolderReaderImpl.java         |   5 +-
 .../drill/exec/vector/UntypedNullVector.java       |  18 ++--
 .../vector/{complex/impl => }/UntypedReader.java   |   4 +-
 .../{complex/impl => }/UntypedReaderImpl.java      |   5 +-
 .../exec/vector/complex/reader/FieldReader.java    |   6 +-
 14 files changed, 319 insertions(+), 26 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/data/Casts.tdd b/exec/java-exec/src/main/codegen/data/Casts.tdd
index 31eef19..7652ab7 100644
--- a/exec/java-exec/src/main/codegen/data/Casts.tdd
+++ b/exec/java-exec/src/main/codegen/data/Casts.tdd
@@ -168,5 +168,22 @@
     {from: "NullableVarBinary", to: "NullableFloat8", major: "EmptyString", javaType:"Double", parse:"Double"},
 
     {from: "NullableVarBinary", to: "NullableVarDecimal", major: "NullableVarCharDecimalComplex"},
+
+    {from: "UntypedNull", to: "Bit", major: "UntypedNull"},
+    {from: "UntypedNull", to: "TinyInt", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Int", major: "UntypedNull"},
+    {from: "UntypedNull", to: "BigInt", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Float4", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Float8", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Date", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Time", major: "UntypedNull"},
+    {from: "UntypedNull", to: "TimeStamp", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Interval", major: "UntypedNull"},
+    {from: "UntypedNull", to: "IntervalDay", major: "UntypedNull"},
+    {from: "UntypedNull", to: "IntervalYear", major: "UntypedNull"},
+    {from: "UntypedNull", to: "VarBinary", major: "UntypedNull"},
+    {from: "UntypedNull", to: "VarChar", major: "UntypedNull"},
+    {from: "UntypedNull", to: "Var16Char", major: "UntypedNull"},
+    {from: "UntypedNull", to: "VarDecimal", major: "UntypedNull"}
   ]
 }
diff --git a/exec/java-exec/src/main/codegen/templates/CastUntypedNull.java b/exec/java-exec/src/main/codegen/templates/CastUntypedNull.java
new file mode 100644
index 0000000..4a22337
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/CastUntypedNull.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+<#list cast.types as type>
+<#if type.major == "UntypedNull">
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gcast/Cast${type.from}${type.to}.java" />
+
+<#include "/@includes/license.ftl" />
+package org.apache.drill.exec.expr.fn.impl.gcast;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@FunctionTemplate(name = "cast${type.to?upper_case}",
+    scope = FunctionTemplate.FunctionScope.SIMPLE,
+    nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+public class Cast${type.from}${type.to} implements DrillSimpleFunc {
+
+  @Param ${type.from}Holder in;
+  <#if type.to == "VarDecimal">
+  @Param IntHolder precision;
+  @Param IntHolder scale;
+  <#elseif type.to == "VarChar" || type.to == "VarBinary" || type.to == "Var16Char">
+  @Param BigIntHolder len;
+  </#if>
+  @Output Nullable${type.to}Holder out;
+
+  public void setup() {
+  }
+
+  public void eval() {
+    out.isSet = 0;
+  }
+}
+</#if> <#-- type.major -->
+</#list>
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 696d6db..cd3a22f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,6 +129,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 
         // Load the vector.
         if (buf == null) {
+          // Buffers for untyped null vectors are always null and for the case
+          // field value alone is sufficient to load the vector
+          if (vector instanceof UntypedNullVector) {
+            vector.load(field, null);
+          }
           // Schema only
         } else if (field.getValueCount() == 0) {
           AllocationHelper.allocate(vector, 0, 0, 0);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
index 949acf3..13f5dd8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
@@ -556,6 +556,25 @@ public class TestJoinNullable extends BaseTestQuery {
     }
   }
 
+  // Full join with USING clause uses COALESCE internally
+  @Test // DRILL-6962
+  public void testFullJoinUsingUntypedNullColumn() throws Exception {
+    try {
+      enableJoin(true, true);
+      String query = "select * from " +
+          "(select n_nationkey, n_name, coalesce(unk1, unk2) as not_exists from cp.`tpch/nation.parquet`) t1 full join " +
+          "(select r_name, r_comment, coalesce(unk1, unk2) as not_exists from cp.`tpch/region.parquet`) t2 " +
+          "using (not_exists)";
+      testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .expectsNumRecords(30)
+          .go();
+    } finally {
+      resetJoinOptions();
+    }
+  }
+
   public void nullMixedComparatorEqualJoinHelper(final String query) throws Exception {
     testBuilder()
         .sqlQuery(query)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
index 4976947..521531c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java
@@ -18,7 +18,11 @@
 package org.apache.drill;
 
 import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
@@ -36,6 +40,8 @@ import static org.junit.Assert.assertTrue;
 @Category(SqlFunctionTest.class)
 public class TestUntypedNull extends ClusterTest {
 
+  private static final TypeProtos.MajorType UNTYPED_NULL_TYPE = Types.optional(TypeProtos.MinorType.NULL);
+
   @BeforeClass
   public static void setup() throws Exception {
     ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
@@ -106,5 +112,118 @@ public class TestUntypedNull extends ClusterTest {
     assertEquals(0, summary.recordCount());
   }
 
+  @Test
+  public void testCoalesceOnNotExistentColumns() throws Exception {
+    String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` limit 5";
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("coal", UNTYPED_NULL_TYPE)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .go();
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("coal")
+        .baselineValuesForSingleColumn(null, null, null, null, null)
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsWithGroupBy() throws Exception {
+    String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` group by 1";
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("coal", UNTYPED_NULL_TYPE)
+        .build();
+
+    testBuilder()
+      .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .go();
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("coal")
+        .baselineValuesForSingleColumn(new Object[] {null})
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsWithOrderBy() throws Exception {
+    String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` order by 1 limit 5";
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("coal", UNTYPED_NULL_TYPE)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .go();
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("coal")
+        .baselineValuesForSingleColumn(null, null, null, null, null)
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsWithCoalesceInWhereClause() throws Exception {
+    String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` where coalesce(unk1, unk2) > 10";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .expectsNumRecords(0)
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsWithCoalesceInHavingClause() throws Exception {
+    String query = "select 1 from cp.`tpch/nation.parquet` group by n_name having count(coalesce(unk1, unk2)) > 10";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .expectsNumRecords(0)
+        .go();
+  }
+
+  @Test
+  public void testPartitionByCoalesceOnNotExistentColumns() throws Exception {
+    String query =
+        "select row_number() over (partition by coalesce(unk1, unk2)) as row_num from cp.`tpch/nation.parquet` limit 5";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("row_num")
+        .baselineValuesForSingleColumn(1L, 2L, 3L, 4L, 5L)
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsInUDF() throws Exception {
+    String query = "select substr(coalesce(unk1, unk2), 1, 2) as coal from cp.`tpch/nation.parquet` limit 5";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("coal")
+        .baselineValuesForSingleColumn(null, null, null, null, null)
+        .go();
+  }
+
+  @Test
+  public void testCoalesceOnNotExistentColumnsInUDF2() throws Exception {
+    String query = "select abs(coalesce(unk1, unk2)) as coal from cp.`tpch/nation.parquet` limit 5";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("coal")
+        .baselineValuesForSingleColumn(null, null, null, null, null)
+        .go();
+  }
 }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
index 0d884b9..73b4b94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,8 +31,13 @@ import org.apache.drill.categories.SqlFunctionTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.IntervalYearVector;
 import org.apache.drill.test.ClusterFixture;
@@ -46,6 +52,17 @@ import org.junit.rules.ExpectedException;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
+import static org.apache.drill.common.types.TypeProtos.MinorType.BIGINT;
+import static org.apache.drill.common.types.TypeProtos.MinorType.BIT;
+import static org.apache.drill.common.types.TypeProtos.MinorType.DATE;
+import static org.apache.drill.common.types.TypeProtos.MinorType.FLOAT4;
+import static org.apache.drill.common.types.TypeProtos.MinorType.FLOAT8;
+import static org.apache.drill.common.types.TypeProtos.MinorType.INT;
+import static org.apache.drill.common.types.TypeProtos.MinorType.INTERVALYEAR;
+import static org.apache.drill.common.types.TypeProtos.MinorType.TIME;
+import static org.apache.drill.common.types.TypeProtos.MinorType.TIMESTAMP;
+import static org.apache.drill.common.types.TypeProtos.MinorType.VARCHAR;
+import static org.apache.drill.common.types.TypeProtos.MinorType.VARDECIMAL;
 import static org.apache.drill.exec.ExecTest.mockUtcDateTimeZone;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -749,4 +766,57 @@ public class TestCastFunctions extends ClusterTest {
       run("drop table if exists dfs.tmp.test_time_filter");
     }
   }
+
+  @Test
+  public void testCastUntypedNull() throws Exception {
+    String query = "select cast(coalesce(unk1, unk2) as %s) as coal from cp.`tpch/nation.parquet` limit 1";
+
+    Map<String, TypeProtos.MajorType> typesMap = createCastTypeMap();
+    for (Map.Entry<String, TypeProtos.MajorType> entry : typesMap.entrySet()) {
+      String q = String.format(query, entry.getKey());
+
+      MaterializedField field = MaterializedField.create("coal", entry.getValue());
+      BatchSchema expectedSchema = new SchemaBuilder()
+          .add(field)
+          .build();
+
+      // Validate schema
+      testBuilder()
+          .sqlQuery(q)
+          .schemaBaseLine(expectedSchema)
+          .go();
+
+      // Validate result
+      testBuilder()
+          .sqlQuery(q)
+          .unOrdered()
+          .baselineColumns("coal")
+          .baselineValues(new Object[] {null})
+          .go();
+    }
+  }
+
+  private static Map<String, TypeProtos.MajorType> createCastTypeMap() {
+    TypeProtos.DataMode mode = TypeProtos.DataMode.OPTIONAL;
+    Map<String, TypeProtos.MajorType> typesMap = new HashMap<>();
+    typesMap.put("BOOLEAN", Types.withMode(BIT, mode));
+    typesMap.put("INT", Types.withMode(INT, mode));
+    typesMap.put("BIGINT", Types.withMode(BIGINT, mode));
+    typesMap.put("FLOAT", Types.withMode(FLOAT4, mode));
+    typesMap.put("DOUBLE", Types.withMode(FLOAT8, mode));
+    typesMap.put("DATE", Types.withMode(DATE, mode));
+    typesMap.put("TIME", Types.withMode(TIME, mode));
+    typesMap.put("TIMESTAMP", Types.withMode(TIMESTAMP, mode));
+    typesMap.put("INTERVAL MONTH", Types.withMode(INTERVALYEAR, mode));
+    typesMap.put("INTERVAL YEAR", Types.withMode(INTERVALYEAR, mode));
+    // todo: uncomment after DRILL-6993 is resolved
+    // typesMap.put("VARBINARY(31)", Types.withPrecision(VARBINARY, mode, 31));
+    typesMap.put("VARCHAR(26)", Types.withPrecision(VARCHAR, mode, 26));
+    typesMap.put("DECIMAL(9, 2)", Types.withScaleAndPrecision(VARDECIMAL, mode, 2, 9));
+    typesMap.put("DECIMAL(18, 5)", Types.withScaleAndPrecision(VARDECIMAL, mode, 5, 18));
+    typesMap.put("DECIMAL(28, 3)", Types.withScaleAndPrecision(VARDECIMAL, mode, 3, 28));
+    typesMap.put("DECIMAL(38, 2)", Types.withScaleAndPrecision(VARDECIMAL, mode, 2, 38));
+
+    return typesMap;
+  }
 }
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 7025f18..c8b8c5e 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -470,12 +470,11 @@ public class DrillCursor implements Cursor {
         QueryDataBatch qrb = resultsListener.getNext();
 
         // (Apparently:)  Skip any spurious empty batches (batches that have
-        // zero rows and/or null data, other than the first batch (which carries
+        // zero rows and null data, other than the first batch (which carries
         // the (initial) schema but no rows)).
-        if ( afterFirstBatch ) {
-          while ( qrb != null
-                  && ( qrb.getHeader().getRowCount() == 0
-                      || qrb.getData() == null ) ) {
+        if (afterFirstBatch) {
+          while (qrb != null
+              && (qrb.getHeader().getRowCount() == 0 && qrb.getData() == null)) {
             // Empty message--dispose of and try to get another.
             logger.warn( "Spurious batch read: {}", qrb );
 
diff --git a/exec/vector/src/main/codegen/templates/AbstractFieldReader.java b/exec/vector/src/main/codegen/templates/AbstractFieldReader.java
index 4a763c2..7400d39 100644
--- a/exec/vector/src/main/codegen/templates/AbstractFieldReader.java
+++ b/exec/vector/src/main/codegen/templates/AbstractFieldReader.java
@@ -29,9 +29,9 @@ package org.apache.drill.exec.vector.complex.impl;
  * This class is generated using freemarker and the ${.template_name} template.
  */
 @SuppressWarnings("unused")
-abstract class AbstractFieldReader extends AbstractBaseReader implements FieldReader {
+public abstract class AbstractFieldReader extends AbstractBaseReader implements FieldReader {
 
-  AbstractFieldReader() {
+  public AbstractFieldReader() {
   }
 
   /**
diff --git a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
index 383e195..f1de685 100644
--- a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
+++ b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
@@ -64,6 +64,8 @@ public class BasicTypeHelper {
       case FIXEDCHAR: return major.getPrecision();
       case FIXED16CHAR: return major.getPrecision();
       case FIXEDBINARY: return major.getPrecision();
+      case NULL:
+        return 0;
     }
     throw new UnsupportedOperationException(buildErrorMessage("get size", major));
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedHolderReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedHolderReaderImpl.java
similarity index 92%
rename from exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedHolderReaderImpl.java
rename to exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedHolderReaderImpl.java
index 3e10d78..66f03f1 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedHolderReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedHolderReaderImpl.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.impl;
+package org.apache.drill.exec.vector;
 
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.complex.impl.AbstractFieldReader;
 
 public class UntypedHolderReaderImpl extends AbstractFieldReader {
 
@@ -47,5 +47,4 @@ public class UntypedHolderReaderImpl extends AbstractFieldReader {
   public boolean isSet() {
     return false;
   }
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
index 9dd8480..b83d872 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedNullVector.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector;
 
-
-import org.apache.drill.exec.vector.complex.impl.UntypedReaderImpl;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -47,7 +45,6 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
 
   public UntypedNullVector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    valueCount = 0;
   }
 
   @Override
@@ -77,7 +74,9 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
   public boolean allocateNewSafe() { return true; }
 
   @Override
-  public void allocateNew(final int valueCount) { }
+  public void allocateNew(final int valueCount) {
+    this.valueCount = valueCount;
+  }
 
   @Override
   public void reset() { }
@@ -125,7 +124,10 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
     return new TransferImpl((UntypedNullVector) to);
   }
 
-  public void transferTo(UntypedNullVector target) { }
+  public void transferTo(UntypedNullVector target) {
+    target.valueCount = valueCount;
+    clear();
+  }
 
   public void splitAndTransferTo(int startIndex, int length, UntypedNullVector target) { }
 
@@ -170,7 +172,11 @@ public final class UntypedNullVector extends BaseDataValueVector implements Fixe
 
   @Override
   public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
-    ((UntypedNullVector) from).data.getBytes(fromIndex * 4, data, toIndex * 4, 4);
+  }
+
+  @Override
+  public void clear() {
+    valueCount = 0;
   }
 
   public final class Accessor extends BaseAccessor {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReader.java
similarity index 90%
rename from exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReader.java
rename to exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReader.java
index f15b852..ec24f2c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReader.java
@@ -15,9 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.impl;
+package org.apache.drill.exec.vector;
 
-import org.apache.drill.exec.vector.UntypedNullHolder;
 import org.apache.drill.exec.vector.complex.reader.BaseReader;
 
 public interface UntypedReader extends BaseReader {
@@ -26,5 +25,4 @@ public interface UntypedReader extends BaseReader {
   int size();
   void read(UntypedNullHolder holder);
   void read(int arrayIndex, UntypedNullHolder holder);
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReaderImpl.java
similarity index 92%
rename from exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReaderImpl.java
rename to exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReaderImpl.java
index da6f63e..924156e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/UntypedReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/UntypedReaderImpl.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.vector.complex.impl;
+package org.apache.drill.exec.vector;
 
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.complex.impl.AbstractFieldReader;
 
 public class UntypedReaderImpl extends AbstractFieldReader {
 
@@ -46,5 +46,4 @@ public class UntypedReaderImpl extends AbstractFieldReader {
   public void read(int arrayIndex, UntypedNullHolder holder) {
     holder.isSet = 0;
   }
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java
index 488306e..de165d0 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java
@@ -17,14 +17,12 @@
  */
 package org.apache.drill.exec.vector.complex.reader;
 
-import org.apache.drill.exec.vector.complex.impl.UntypedReader;
+import org.apache.drill.exec.vector.UntypedReader;
 import org.apache.drill.exec.vector.complex.reader.BaseReader.ListReader;
 import org.apache.drill.exec.vector.complex.reader.BaseReader.MapReader;
 import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedListReader;
 import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedMapReader;
 import org.apache.drill.exec.vector.complex.reader.BaseReader.ScalarReader;
 
-
-
 public interface FieldReader extends MapReader, ListReader, ScalarReader, RepeatedMapReader, RepeatedListReader, UntypedReader {
-}
\ No newline at end of file
+}


[drill] 03/08: DRILL-6947: Fix RuntimeFilter memory leak

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 1887cce249e79058c7328c2bbf094b3d979e6ab2
Author: weijie.tong <we...@alipay.com>
AuthorDate: Sun Jan 6 15:48:55 2019 +0800

    DRILL-6947: Fix RuntimeFilter memory leak
---
 .../impl/filter/RuntimeFilterRecordBatch.java      | 44 ++++++++++++++++------
 .../exec/physical/impl/join/HashJoinBatch.java     |  8 +++-
 .../exec/work/filter/RuntimeFilterWritable.java    | 21 +++++++++++
 3 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bf7ed79..ac6718c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -224,21 +224,41 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
     setupHashHelper();
     //To make each independent bloom filter work together to construct a final filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
-    for (int i = 0; i < toFilterFields.size(); i++) {
-      BloomFilter bloomFilter = bloomFilters.get(i);
-      String fieldName = toFilterFields.get(i);
-      computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
-    }
+
+    int filterSize = toFilterFields.size();
     int svIndex = 0;
-    for (int i = 0; i < originalRecordCount; i++) {
-      boolean contain = bitSet.get(i);
-      if (contain) {
-        sv2.setIndex(svIndex, i);
-        svIndex++;
-      } else {
-        filteredRows++;
+    if (filterSize == 1) {
+      BloomFilter bloomFilter = bloomFilters.get(0);
+      String fieldName = toFilterFields.get(0);
+      int fieldId = field2id.get(fieldName);
+      for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
+        long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+        boolean contain = bloomFilter.find(hash);
+        if (contain) {
+          sv2.setIndex(svIndex, rowIndex);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
+      }
+    } else {
+      for (int i = 0; i < toFilterFields.size(); i++) {
+        BloomFilter bloomFilter = bloomFilters.get(i);
+        String fieldName = toFilterFields.get(i);
+        computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+      }
+      for (int i = 0; i < originalRecordCount; i++) {
+        boolean contain = bitSet.get(i);
+        if (contain) {
+          sv2.setIndex(svIndex, i);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
       }
     }
+
+
     appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809..30e8af7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -213,6 +213,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
   private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
   private List<BloomFilter> bloomFilters = new ArrayList<>();
+  private boolean bloomFiltersGenerated = false;
 
   /**
    * This holds information about the spilled partitions for the build and probe side.
@@ -818,8 +819,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   }
 
+  /**
+   * Note:
+   * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions.
+   */
   private void initializeRuntimeFilter() {
-    if (!enableRuntimeFilter) {
+    if (!enableRuntimeFilter || bloomFiltersGenerated) {
       return;
     }
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
@@ -838,6 +843,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
+    bloomFiltersGenerated = true;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index f8c2701..aebd010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +40,9 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
   private String identifier;
 
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
+    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int bufArrLen = data.length;
+    Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input DrillBuf number does not match the metadata definition!");
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
     this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
@@ -46,6 +50,23 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
+  public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) {
+    this.runtimeFilterBDef = runtimeFilterBDef;
+    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int boomFilterNum = bfSizeInBytes.size();
+    this.data = new DrillBuf[boomFilterNum];
+    int index = 0;
+    for (int i = 0; i < boomFilterNum; i++) {
+      int length = bfSizeInBytes.get(i);
+      this.data[i] = data.slice(index, length);
+      index = index + length;
+    }
+
+    this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+                      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+                      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
+  }
+
 
   public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
     return runtimeFilterBDef;


[drill] 07/08: DRILL-6985: Fix sqlline.bat issues on Windows and add drill-embedded.bat

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5026cd12e317e71c4dac1561c347991a36c94e4d
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Mon Jan 21 03:47:59 2019 +0200

    DRILL-6985: Fix sqlline.bat issues on Windows and add drill-embedded.bat
    
    closes #1616
---
 distribution/src/assemble/component.xml       |  6 ++++
 distribution/src/resources/drill-embedded.bat | 24 ++++++++++++++++
 distribution/src/resources/sqlline.bat        | 41 +++++++++++----------------
 3 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 2b60299..a79a4fa 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -311,6 +311,11 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
+      <source>src/resources/drill-embedded.bat</source>
+      <fileMode>0755</fileMode>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
       <source>src/resources/drill-localhost</source>
       <fileMode>0755</fileMode>
       <outputDirectory>bin</outputDirectory>
@@ -327,6 +332,7 @@
     </file>
     <file>
       <source>src/resources/sqlline.bat</source>
+      <fileMode>0755</fileMode>
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
diff --git a/distribution/src/resources/drill-embedded.bat b/distribution/src/resources/drill-embedded.bat
new file mode 100644
index 0000000..bf38db1
--- /dev/null
+++ b/distribution/src/resources/drill-embedded.bat
@@ -0,0 +1,24 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+@REM
+
+@echo off
+setlocal EnableExtensions EnableDelayedExpansion
+
+set BIN_DIR=%~dp0
+pushd %BIN_DIR%
+call sqlline.bat -u "jdbc:drill:zk=local" %*
diff --git a/distribution/src/resources/sqlline.bat b/distribution/src/resources/sqlline.bat
index 40f2771..360b758 100755
--- a/distribution/src/resources/sqlline.bat
+++ b/distribution/src/resources/sqlline.bat
@@ -17,27 +17,19 @@
 @REM
 
 @echo off
-@rem/*
-@rem * Licensed to the Apache Software Foundation (ASF) under one
-@rem * or more contributor license agreements.  See the NOTICE file
-@rem * distributed with this work for additional information
-@rem * regarding copyright ownership.  The ASF licenses this file
-@rem * to you under the Apache License, Version 2.0 (the
-@rem * "License"); you may not use this file except in compliance
-@rem * with the License.  You may obtain a copy of the License at
-@rem *
-@rem *     http://www.apache.org/licenses/LICENSE-2.0
-@rem *
-@rem * Unless required by applicable law or agreed to in writing, software
-@rem * distributed under the License is distributed on an "AS IS" BASIS,
-@rem * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@rem * See the License for the specific language governing permissions and
-@rem * limitations under the License.
-@rem */
-@rem 
 setlocal EnableExtensions EnableDelayedExpansion
 
 rem ----
+rem Sets Drill home and bin dirs before shift is applied
+rem to preserve correct paths
+rem ----
+
+set DRILL_BIN_DIR=%~dp0
+pushd %DRILL_BIN_DIR%..
+set DRILL_HOME=%cd%
+popd
+
+rem ----
 rem In order to pass in arguments with an equals symbol, use quotation marks.
 rem For example
 rem sqlline -u "jdbc:drill:zk=local" -n admin -p admin
@@ -54,6 +46,7 @@ set atleastonearg=0
 
 if x%1 == x-q (
   set QUERY=%2
+  set QUERY=!QUERY:"=!
   set atleastonearg=1
   shift
   shift
@@ -61,6 +54,7 @@ if x%1 == x-q (
 
 if x%1 == x-e (
   set QUERY=%2
+  set QUERY=!QUERY:"=!
   set atleastonearg=1
   shift
   shift
@@ -68,6 +62,7 @@ if x%1 == x-e (
 
 if x%1 == x-f (
   set FILE=%2
+  set FILE=!FILE:"=!
   set atleastonearg=1
   shift
   shift
@@ -76,6 +71,7 @@ if x%1 == x-f (
 if x%1 == x--config (
   set confdir=%2
   set DRILL_CONF_DIR=%2
+  set DRILL_CONF_DIR=!DRILL_CONF_DIR:"=!
   set atleastonearg=1
   shift
   shift
@@ -127,11 +123,6 @@ rem ----
 rem Deal with Drill variables
 rem ----
 
-set DRILL_BIN_DIR=%~dp0
-pushd %DRILL_BIN_DIR%..
-set DRILL_HOME=%cd%
-popd
-
 if "test%DRILL_CONF_DIR%" == "test" (
   set DRILL_CONF_DIR=%DRILL_HOME%\conf
 )
@@ -219,10 +210,10 @@ if errorlevel 1 (
 
 set SQLLINE_CALL=sqlline.SqlLine -ac org.apache.drill.exec.client.DrillSqlLineApplication -d org.apache.drill.jdbc.Driver
 if NOT "test%QUERY%"=="test" (
-  echo %QUERY% | "%JAVA_CMD%" %DRILL_SHELL_JAVA_OPTS% %DRILL_JAVA_OPTS% -cp "%DRILL_CP%" %SQLLINE_CALL% %DRILL_ARGS%
+  "%JAVA_CMD%" %DRILL_SHELL_JAVA_OPTS% %DRILL_JAVA_OPTS% -cp "%DRILL_CP%" %SQLLINE_CALL% %DRILL_ARGS% -e "%QUERY%"
 ) else (
   if NOT "test%FILE%"=="test" (
-    "%JAVA_CMD%" %DRILL_SHELL_JAVA_OPTS% %DRILL_JAVA_OPTS% -cp "%DRILL_CP%" %SQLLINE_CALL% %DRILL_ARGS% --run=%FILE%
+    "%JAVA_CMD%" %DRILL_SHELL_JAVA_OPTS% %DRILL_JAVA_OPTS% -cp "%DRILL_CP%" %SQLLINE_CALL% %DRILL_ARGS% --run="%FILE%"
   ) else (
     "%JAVA_CMD%" %DRILL_SHELL_JAVA_OPTS% %DRILL_JAVA_OPTS% -cp "%DRILL_CP%" %SQLLINE_CALL% %DRILL_ARGS%
   )


[drill] 04/08: DRILL-6999: Fix the case that there's more than one join conditions

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 4e03d54cd854c08a5ed96a67e7c27f02fa5ff435
Author: weijie.tong <we...@alipay.com>
AuthorDate: Thu Jan 24 22:06:40 2019 +0800

    DRILL-6999: Fix the case that there's more than one join conditions
    
    closes #1600
---
 .../drill/exec/rpc/data/DataServerRequestHandler.java  | 13 ++++++++++++-
 .../drill/exec/work/filter/RuntimeFilterWritable.java  | 18 ------------------
 2 files changed, 12 insertions(+), 19 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
index e60fcae..5ad7ba4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 // package private
@@ -106,7 +107,17 @@ class DataServerRequestHandler implements RequestHandler<DataServerConnection> {
     if (dBody == null) {
       return;
     }
-    RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, (DrillBuf) dBody);
+    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int boomFilterNum = bfSizeInBytes.size();
+    DrillBuf data = (DrillBuf) dBody;
+    DrillBuf[] bufs = new DrillBuf[boomFilterNum];
+    int index = 0;
+    for (int i = 0; i < boomFilterNum; i++) {
+      int length = bfSizeInBytes.get(i);
+      bufs[i] = data.slice(index, length);
+      index = index + length;
+    }
+    RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, bufs);
     AckSender ackSender = new AckSender(sender);
     ackSender.increment();
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index aebd010..566781b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -50,24 +50,6 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
-  public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) {
-    this.runtimeFilterBDef = runtimeFilterBDef;
-    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
-    int boomFilterNum = bfSizeInBytes.size();
-    this.data = new DrillBuf[boomFilterNum];
-    int index = 0;
-    for (int i = 0; i < boomFilterNum; i++) {
-      int length = bfSizeInBytes.get(i);
-      this.data[i] = data.slice(index, length);
-      index = index + length;
-    }
-
-    this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
-                      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
-                      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
-  }
-
-
   public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
     return runtimeFilterBDef;
   }


[drill] 08/08: DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b557b796dc1ca7796b0db956e39df1d52f212f12
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Jan 23 17:38:44 2019 -0800

    DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete
    
    closes #1621
---
 .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index f69a44e..c0eceae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable
       joinMjId2Stopwatch.put(joinMjId, stopwatch);
     }
     synchronized (rfQueue) {
+      if (!running.get()) {
+        runtimeFilterWritable.close();
+        return;
+      }
       rfQueue.add(runtimeFilterWritable);
       rfQueue.notify();
     }
@@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable
           aggregate(toAggregate);
         } catch (Exception ex) {
           logger.error("Failed to aggregate or route the RFW", ex);
+
+          // Set running to false and cleanup pending RFW in queue. This will make sure producer
+          // thread is also indicated to stop and queue is cleaned up properly in failure cases
+          synchronized (rfQueue) {
+            running.set(false);
+          }
+          cleanupQueue();
           throw new DrillRuntimeException(ex);
         } finally {
-          if (toAggregate != null) {
             toAggregate.close();
-          }
         }
       }
+      cleanupQueue();
+    }
 
+    private void cleanupQueue() {
       if (!running.get()) {
         RuntimeFilterWritable toClose;
         while ((toClose = rfQueue.poll()) != null) {


[drill] 06/08: DRILL-6977: Improve Hive tests configuration

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a00f1800b95a007b0890f97fd260a6085911438b
Author: Igor Guzenko <ih...@gmail.com>
AuthorDate: Fri Jan 18 20:27:54 2019 +0200

    DRILL-6977: Improve Hive tests configuration
    
    1. HiveTestBase data initialization moved to static block
       to be initialized once for all derivatives.
    2. Extracted Hive driver and storage plugin management from HiveTestDataGenerator
       to HiveTestFixture class. This increased cohesion of generator and
       added loose coupling between hive test configuration and data generation
       tasks.
    3. Replaced usage of Guava ImmutableLists with TestBaseViewSupport
       helper methods by using standard JDK collections.
    
    closes #1613
---
 .../org/apache/drill/exec/hive/HiveTestBase.java   |  44 ++-
 .../apache/drill/exec/hive/HiveTestFixture.java    | 295 +++++++++++++++++++++
 .../apache/drill/exec/hive/HiveTestUtilities.java  |  36 +++
 .../apache/drill/exec/hive/TestHiveStorage.java    |  34 +--
 .../hive/BaseTestHiveImpersonation.java            |  18 +-
 .../exec/sql/hive/TestViewSupportOnHiveTables.java |  47 ++--
 .../exec/store/hive/HiveTestDataGenerator.java     | 137 ++--------
 .../apache/drill/exec/sql/TestBaseViewSupport.java |  38 +++
 .../org/apache/drill/exec/sql/TestViewSupport.java | 106 +++-----
 9 files changed, 510 insertions(+), 245 deletions(-)

diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
index 5758eec..c3acdb0 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java
@@ -17,28 +17,58 @@
  */
 package org.apache.drill.exec.hive;
 
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.runner.Description;
 
 /**
  * Base class for Hive test. Takes care of generating and adding Hive test plugin before tests and deleting the
  * plugin after tests.
  */
 public class HiveTestBase extends PlanTestBase {
-  protected static HiveTestDataGenerator hiveTest;
+
+  public static final HiveTestFixture HIVE_TEST_FIXTURE;
+
+  static {
+    // generate hive data common for all test classes using own dirWatcher
+    BaseDirTestWatcher generalDirWatcher = new BaseDirTestWatcher() {
+      {
+        /*
+           Below protected method invoked to create directory DirWatcher.dir with path like:
+           ./target/org.apache.drill.exec.hive.HiveTestBase123e4567-e89b-12d3-a456-556642440000.
+           Then subdirectory with name 'root' will be used to hold metastore_db and other data shared between
+           all derivatives of the class. Note that UUID suffix is necessary to avoid conflicts between forked JVMs.
+        */
+        starting(Description.createSuiteDescription(HiveTestBase.class.getName().concat(UUID.randomUUID().toString())));
+      }
+    };
+    File baseDir = generalDirWatcher.getRootDir();
+    HIVE_TEST_FIXTURE = HiveTestFixture.builder(baseDir).build();
+    HiveTestDataGenerator dataGenerator = new HiveTestDataGenerator(generalDirWatcher, baseDir,
+        HIVE_TEST_FIXTURE.getWarehouseDir());
+    HIVE_TEST_FIXTURE.getDriverManager().runWithinSession(dataGenerator::generateData);
+
+    // set hook for clearing watcher's dir on JVM shutdown
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> FileUtils.deleteQuietly(generalDirWatcher.getDir())));
+  }
 
   @BeforeClass
-  public static void generateHive() throws Exception {
-    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
-    hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
+  public static void setUp() {
+    HIVE_TEST_FIXTURE.getPluginManager().addHivePluginTo(bits);
   }
 
   @AfterClass
-  public static void cleanupHiveTestData() throws Exception{
-    if (hiveTest != null) {
-      hiveTest.deleteHiveTestPlugin(getDrillbitContext().getStorage());
+  public static void tearDown() {
+    if (HIVE_TEST_FIXTURE != null) {
+      HIVE_TEST_FIXTURE.getPluginManager().removeHivePluginFrom(bits);
     }
   }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
new file mode 100644
index 0000000..5078272
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestFixture.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.hive.HiveStoragePlugin;
+import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
+import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
+
+
+/**
+ * Test fixture for configuration of Hive tests, which
+ * allows granular control over initialization of test data
+ * and hive storage plugin.
+ * <p>
+ * Below is example of usage HiveTestFixture along with ClusterFixture:
+ * <p>
+ * <pre><code>
+ *   // Note that HiveTestFixture doesn't require extension of ClusterTest,
+ *   // this is just the simplest way for configuring test drillbit
+ *   public class HiveTestExample extends ClusterTest {
+ *
+ *       private static HiveTestFixture hiveTestFixture;
+ *
+ *       {@literal @}BeforeClass
+ *       public static void setUp() throws Exception {
+ *            startCluster(ClusterFixture.builder(dirTestWatcher));
+ *
+ *            // Below is minimal config which uses defaults from HiveTestFixture.Builder
+ *            // constructor, but any option for driver or storage plugin may be
+ *            // overridden using builder's methods
+ *            hiveTestFixture = HiveTestFixture.builder(dirTestWatcher).build();
+ *
+ *            // Use driver manager to configure test data in Hive metastore
+ *            hiveTestFixture.getDriverManager().runWithinSession(HiveTestExample::generateData);
+ *
+ *            // Use plugin manager to add, remove, update hive storage plugin of one or many test drillbits
+ *            hiveTestFixture.getPluginManager().addHivePluginTo(cluster.drillbits());
+ *       }
+ *
+ *       private static void generateData(Driver driver) {
+ *            // Set up data using HiveTestUtilities.executeQuery(driver, sql)
+ *       }
+ *
+ *       {@literal @}AfterClass
+ *       public static void tearDown() throws Exception {
+ *            if (nonNull(hiveTestFixture)) {
+ *                hiveTestFixture.getPluginManager().removeHivePluginFrom(cluster.drillbits());
+ *            }
+ *       }
+ * }
+ * </code></pre>
+ */
+public class HiveTestFixture {
+
+  private final Map<String, String> pluginConf;
+
+  private final Map<String, String> driverConf;
+
+  private final String pluginName;
+
+  private final HivePluginManager pluginManager;
+
+  private final HiveDriverManager driverManager;
+
+  private HiveTestFixture(Builder builder) {
+    this.pluginConf = new HashMap<>(builder.pluginConf);
+    this.driverConf = new HashMap<>(builder.driverConf);
+    this.pluginName = builder.pluginName;
+    this.pluginManager = new HivePluginManager();
+    this.driverManager = new HiveDriverManager();
+  }
+
+  public static Builder builder(BaseDirTestWatcher dirWatcher) {
+    return builder(requireNonNull(dirWatcher, "Parameter 'dirWatcher' can't be null!").getRootDir());
+  }
+
+  public static Builder builder(File baseDir) {
+    return new Builder(requireNonNull(baseDir, "Parameter 'baseDir' can't be null!"));
+  }
+
+  public HivePluginManager getPluginManager() {
+    return pluginManager;
+  }
+
+  public HiveDriverManager getDriverManager() {
+    return driverManager;
+  }
+
+  /**
+   * Returns current value of 'hive.metastore.warehouse.dir' option
+   * which expected to represent location of metastore warehouse directory.
+   * Builder's user can override any option either of pluginConf or driverConf.
+   * Since setting of the option is not enforced, this method just tries to
+   * find it in any of the conf maps.
+   *
+   * @return current value of 'hive.metastore.warehouse.dir' option
+   *         from pluginConf or driverConf
+   */
+  public String getWarehouseDir() {
+    String warehouseDir = pluginConf.get(ConfVars.METASTOREWAREHOUSE.varname);
+    return nonNull(warehouseDir) ? warehouseDir : driverConf.get(ConfVars.METASTOREWAREHOUSE.varname);
+  }
+
+  public static class Builder {
+
+    private final Map<String, String> pluginConf;
+
+    private final Map<String, String> driverConf;
+
+    private String pluginName;
+
+    private Builder(File baseDir) {
+      this.pluginConf = new HashMap<>();
+      this.driverConf = new HashMap<>();
+      String jdbcUrl = String.format("jdbc:derby:;databaseName=%s;create=true",
+          new File(baseDir, "metastore_db").getAbsolutePath());
+      String warehouseDir = new File(baseDir, "warehouse").getAbsolutePath();
+      // Drill Hive Storage plugin defaults
+      pluginName("hive");
+      pluginOption(ConfVars.METASTOREURIS, "");
+      pluginOption(ConfVars.METASTORECONNECTURLKEY, jdbcUrl);
+      pluginOption(ConfVars.METASTOREWAREHOUSE, warehouseDir);
+      pluginOption(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+      // Hive Driver defaults
+      driverOption(ConfVars.METASTORECONNECTURLKEY, jdbcUrl);
+      driverOption(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+      driverOption(ConfVars.METASTOREWAREHOUSE, warehouseDir);
+      driverOption("mapred.job.tracker", "local");
+      driverOption(ConfVars.SCRATCHDIR, createDirWithPosixPermissions(baseDir, "scratch_dir").getAbsolutePath());
+      driverOption(ConfVars.LOCALSCRATCHDIR, createDirWithPosixPermissions(baseDir, "local_scratch_dir").getAbsolutePath());
+      driverOption(ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+      driverOption(ConfVars.METASTORE_AUTO_CREATE_ALL, Boolean.toString(true));
+      driverOption(ConfVars.METASTORE_SCHEMA_VERIFICATION, Boolean.toString(false));
+      driverOption(ConfVars.HIVE_CBO_ENABLED, Boolean.toString(false));
+    }
+
+    public Builder pluginOption(ConfVars option, String value) {
+      return pluginOption(option.varname, value);
+    }
+
+    public Builder pluginOption(String option, String value) {
+      return put(pluginConf, option, value);
+    }
+
+    public Builder driverOption(ConfVars option, String value) {
+      return driverOption(option.varname, value);
+    }
+
+    public Builder driverOption(String option, String value) {
+      return put(driverConf, option, value);
+    }
+
+    public Builder pluginName(String name) {
+      this.pluginName = Objects.requireNonNull(name, "Hive plugin name can't be null!");
+      return this;
+    }
+
+    private Builder put(Map<String, String> map, String key, String value) {
+      map.put(key, value);
+      return this;
+    }
+
+    public HiveTestFixture build() {
+      return new HiveTestFixture(this);
+    }
+
+  }
+
+  /**
+   * Implements addition, update and deletion of
+   * Hive storage plugin for drillbits passed from outside.
+   * The class was made inner because it uses pluginName and pluginConf
+   * of enclosing fixture instance.
+   */
+  public class HivePluginManager {
+
+    /**
+     *  {@link HiveTestFixture}'s constructor will create instance,
+     *  and API users will get it via {@link HiveTestFixture#getPluginManager()}.
+     */
+    private HivePluginManager() {
+    }
+
+    public void addHivePluginTo(Drillbit... drillbits) {
+      addHivePluginTo(Arrays.asList(drillbits));
+    }
+
+    public void addHivePluginTo(Iterable<Drillbit> drillbits) {
+      try {
+        for (Drillbit drillbit : drillbits) {
+          HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(new HashMap<>(pluginConf));
+          pluginConfig.setEnabled(true);
+          drillbit.getContext().getStorage().createOrUpdate(pluginName, pluginConfig, true);
+        }
+      } catch (ExecutionSetupException e) {
+        throw new RuntimeException("Failed to add Hive storage plugin to drillbits", e);
+      }
+    }
+
+    public void removeHivePluginFrom(Drillbit... drillbits) {
+      removeHivePluginFrom(Arrays.asList(drillbits));
+    }
+
+    public void removeHivePluginFrom(Iterable<Drillbit> drillbits) {
+      drillbits.forEach(bit -> bit.getContext().getStorage().deletePlugin(pluginName));
+    }
+
+    public void updateHivePlugin(Iterable<Drillbit> drillbits,
+                                 Map<String, String> configOverride) {
+      try {
+        for (Drillbit drillbit : drillbits) {
+          StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+          HiveStoragePlugin storagePlugin = Objects.requireNonNull(
+              (HiveStoragePlugin) pluginRegistry.getPlugin(pluginName),
+              String.format("Hive storage plugin with name '%s' doesn't exist.", pluginName));
+
+          HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
+          newPluginConfig.getConfigProps().putAll(configOverride);
+          pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true);
+        }
+      } catch (ExecutionSetupException e) {
+        throw new RuntimeException("Failed to update Hive storage plugin for drillbits", e);
+      }
+    }
+
+  }
+
+
+  /**
+   * Implements method for initialization and passing
+   * of Hive to consumer instances in order to be used
+   * for test data generation within session.
+   * The class was made inner because it uses driverConf
+   * of enclosing fixture instance.
+   */
+  public class HiveDriverManager {
+
+    /**
+     *  {@link HiveTestFixture}'s constructor will create instance,
+     *  and API users will get it via {@link HiveTestFixture#getDriverManager()}.
+     */
+    private HiveDriverManager() {
+    }
+
+    public void runWithinSession(Consumer<Driver> driverConsumer) {
+      final HiveConf hiveConf = new HiveConf(SessionState.class);
+      driverConf.forEach(hiveConf::set);
+      SessionState ss = new SessionState(hiveConf);
+      try (Closeable ssClose = ss::close) {
+        SessionState.start(ss);
+        driverConsumer.accept(new Driver(hiveConf));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception was thrown while closing SessionState", e);
+      }
+    }
+
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
index 0556c93..7457511 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
@@ -17,6 +17,14 @@
  */
 package org.apache.drill.exec.hive;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.EnumSet;
+import java.util.Set;
+
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -24,6 +32,12 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 public class HiveTestUtilities {
 
   /**
+   * Set of all posix permissions to be assigned to newly created file in
+   * {@link HiveTestUtilities#createDirWithPosixPermissions(File, String)}
+   */
+  private static final Set<PosixFilePermission> ALL_POSIX_PERMISSIONS = EnumSet.allOf(PosixFilePermission.class);
+
+  /**
    * Execute the give <i>query</i> on given <i>hiveDriver</i> instance. If a {@link CommandNeedRetryException}
    * exception is thrown, it tries upto 3 times before returning failure.
    * @param hiveDriver
@@ -47,4 +61,26 @@ public class HiveTestUtilities {
           query, (response != null ? response.getErrorMessage() : "")));
     }
   }
+
+  /**
+   * Creates desired directory structure and
+   * adds all posix permissions to created directory.
+   *
+   * @param parentDir parent directory
+   * @param dirName directory name
+   * @return file representing created dir with all posix permissions
+   */
+  public static File createDirWithPosixPermissions(File parentDir, String dirName) {
+    File dir = new File(parentDir, dirName);
+    dir.mkdirs();
+    Path path = dir.toPath();
+    try {
+      Files.setPosixFilePermissions(path, ALL_POSIX_PERMISSIONS);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to set all posix permissions for directory [%s]", dir), e);
+    }
+    return dir;
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 0685906..7355de7 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -17,12 +17,9 @@
  */
 package org.apache.drill.exec.hive;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -43,8 +40,11 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 
 @Category({SlowTest.class, HiveStorageTest.class})
 public class TestHiveStorage extends HiveTestBase {
@@ -211,8 +211,10 @@ public class TestHiveStorage extends HiveTestBase {
   @Test
   public void queryingTablesInNonDefaultFS() throws Exception {
     // Update the default FS settings in Hive test storage plugin to non-local FS
-    hiveTest.updatePluginConfig(getDrillbitContext().getStorage(),
-        ImmutableMap.of(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9001"));
+
+    HIVE_TEST_FIXTURE.getPluginManager().updateHivePlugin(
+        Collections.singleton(bits[0]),
+        Collections.singletonMap(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9001"));
 
     testBuilder()
         .sqlQuery("SELECT * FROM hive.`default`.kv LIMIT 1")
@@ -224,8 +226,8 @@ public class TestHiveStorage extends HiveTestBase {
 
   @Test // DRILL-745
   public void queryingHiveAvroTable() throws Exception {
-      testBuilder()
-          .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
+    testBuilder()
+        .sqlQuery("SELECT * FROM hive.db1.avro ORDER BY key DESC LIMIT 1")
         .unOrdered()
         .baselineColumns("key", "value")
         .baselineValues(5, " key_5")
@@ -274,7 +276,6 @@ public class TestHiveStorage extends HiveTestBase {
   }
 
 
-
   @Test // DRILL-3739
   public void readingFromStorageHandleBasedTable() throws Exception {
     testBuilder()
@@ -287,7 +288,7 @@ public class TestHiveStorage extends HiveTestBase {
 
   @Test // DRILL-3688
   public void readingFromSmallTableWithSkipHeaderAndFooter() throws Exception {
-   testBuilder()
+    testBuilder()
         .sqlQuery("select key, `value` from hive.skipper.kv_text_small order by key asc")
         .ordered()
         .baselineColumns("key", "value")
@@ -312,7 +313,7 @@ public class TestHiveStorage extends HiveTestBase {
         .sqlQuery("select sum(key) as sum_keys from hive.skipper.kv_text_large")
         .unOrdered()
         .baselineColumns("sum_keys")
-        .baselineValues((long)(5000*(5000 + 1)/2))
+        .baselineValues((long) (5000 * (5000 + 1) / 2))
         .go();
 
     testBuilder()
@@ -383,7 +384,7 @@ public class TestHiveStorage extends HiveTestBase {
   public void testStringColumnsMetadata() throws Exception {
     String query = "select varchar_field, char_field, string_field from hive.readtest";
 
-    Map<String, Integer> expectedResult = Maps.newHashMap();
+    Map<String, Integer> expectedResult = new HashMap<>();
     expectedResult.put("varchar_field", 50);
     expectedResult.put("char_field", 10);
     expectedResult.put("string_field", HiveVarchar.MAX_VARCHAR_LENGTH);
@@ -394,7 +395,7 @@ public class TestHiveStorage extends HiveTestBase {
     try {
       test("alter session set `%s` = true", ExecConstants.EARLY_LIMIT0_OPT_KEY);
       verifyColumnsMetadata(client.createPreparedStatement(String.format("select * from (%s) t limit 0", query)).get()
-              .getPreparedStatement().getColumnsList(), expectedResult);
+          .getPreparedStatement().getColumnsList(), expectedResult);
     } finally {
       test("alter session reset `%s`", ExecConstants.EARLY_LIMIT0_OPT_KEY);
     }
@@ -450,4 +451,5 @@ public class TestHiveStorage extends HiveTestBase {
       assertTrue("Column should be nullable", columnMetadata.getIsNullable());
     }
   }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
index 53088ed..39f8655 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/impersonation/hive/BaseTestHiveImpersonation.java
@@ -17,10 +17,15 @@
  */
 package org.apache.drill.exec.impersonation.hive;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.calcite.schema.Schema.TableType;
-import org.apache.drill.test.TestBuilder;
 import org.apache.drill.exec.impersonation.BaseTestImpersonation;
 import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
+import org.apache.drill.test.TestBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -30,13 +35,8 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
-import static org.apache.drill.exec.store.hive.HiveTestDataGenerator.createFileWithPermissions;
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
@@ -65,8 +65,8 @@ public class BaseTestHiveImpersonation extends BaseTestImpersonation {
   protected static void prepHiveConfAndData() throws Exception {
     hiveConf = new HiveConf();
 
-    File scratchDir = createFileWithPermissions(dirTestWatcher.getRootDir(), "scratch_dir");
-    File localScratchDir = createFileWithPermissions(dirTestWatcher.getRootDir(), "local_scratch_dir");
+    File scratchDir = createDirWithPosixPermissions(dirTestWatcher.getRootDir(), "scratch_dir");
+    File localScratchDir = createDirWithPosixPermissions(dirTestWatcher.getRootDir(), "local_scratch_dir");
     File metaStoreDBDir = new File(dirTestWatcher.getRootDir(), "metastore_db");
 
     // Configure metastore persistence db location on local filesystem
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
index 821200d..5b53113 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/sql/hive/TestViewSupportOnHiveTables.java
@@ -17,26 +17,34 @@
  */
 package org.apache.drill.exec.sql.hive;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.Objects;
+
 import org.apache.drill.categories.HiveStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.sql.TestBaseViewSupport;
-import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.drill.exec.hive.HiveTestBase.HIVE_TEST_FIXTURE;
 import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
 
 @Category({SlowTest.class, HiveStorageTest.class})
 public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
-  protected static HiveTestDataGenerator hiveTest;
 
   @BeforeClass
-  public static void generateHive() throws Exception{
-    hiveTest = HiveTestDataGenerator.getInstance(dirTestWatcher);
-    hiveTest.addHiveTestPlugin(getDrillbitContext().getStorage());
+  public static void setUp() throws Exception {
+    Objects.requireNonNull(HIVE_TEST_FIXTURE, "Failed to configure Hive storage plugin, " +
+        "because HiveTestBase.HIVE_TEST_FIXTURE isn't initialized!")
+        .getPluginManager().addHivePluginTo(bits);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (HIVE_TEST_FIXTURE != null) {
+      HIVE_TEST_FIXTURE.getPluginManager().removeHivePluginFrom(bits);
+    }
   }
 
   @Test
@@ -46,8 +54,8 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         null,
         "SELECT * FROM hive.kv",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "key", "value"},
-        ImmutableList.of(new Object[] { 1, " key_1" })
+        baselineColumns("key", "value"),
+        baselineRows(row( 1, " key_1" ))
     );
   }
 
@@ -58,8 +66,8 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         null,
         "SELECT * FROM hive.kv",
         "SELECT key, `value` FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "key", "value" },
-        ImmutableList.of(new Object[] { 1, " key_1" })
+        baselineColumns("key", "value"),
+        baselineRows(row(1, " key_1"))
     );
   }
 
@@ -70,8 +78,8 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         null,
         "SELECT * FROM hive.kv",
         "SELECT `value` FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "value" },
-        ImmutableList.of(new Object[] { " key_1" })
+        baselineColumns("value"),
+        baselineRows(row(" key_1"))
     );
   }
 
@@ -82,8 +90,8 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         null,
         "SELECT key, `value` FROM hive.kv",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "key", "value" },
-        ImmutableList.of(new Object[] { 1, " key_1" })
+        baselineColumns("key", "value"),
+        baselineRows(row(1, " key_1"))
     );
   }
 
@@ -94,8 +102,8 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         null,
         "SELECT key, `value` FROM hive.kv",
         "SELECT key, `value` FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "key", "value" },
-        ImmutableList.of(new Object[] { 1, " key_1" })
+        baselineColumns("key", "value"),
+        baselineRows(row(1, " key_1"))
     );
   }
 
@@ -110,11 +118,4 @@ public class TestViewSupportOnHiveTables extends TestBaseViewSupport {
         .go();
   }
 
-  @AfterClass
-  public static void cleanupHiveTestData() throws Exception{
-    if (hiveTest != null) {
-      hiveTest.deleteHiveTestPlugin(getDrillbitContext().getStorage());
-    }
-  }
-
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 0b9cd36..a60f0a6 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -18,152 +18,46 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.nio.file.attribute.PosixFilePermission;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
 import org.apache.hadoop.hive.serde.serdeConstants;
 
+import static org.apache.drill.exec.hive.HiveTestUtilities.createDirWithPosixPermissions;
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
 
 public class HiveTestDataGenerator {
-  private static final String HIVE_TEST_PLUGIN_NAME = "hive";
-  private static HiveTestDataGenerator instance;
-  private static File baseDir;
 
-  private final String dbDir;
-  private final String whDir;
   private final BaseDirTestWatcher dirTestWatcher;
-  private final Map<String, String> config;
-
-  public static synchronized HiveTestDataGenerator getInstance(BaseDirTestWatcher dirTestWatcher) throws Exception {
-    File baseDir = dirTestWatcher.getRootDir();
-    if (instance == null || !HiveTestDataGenerator.baseDir.equals(baseDir)) {
-      HiveTestDataGenerator.baseDir = baseDir;
-
-      File dbDirFile = new File(baseDir, "metastore_db");
-      File whDirFile = new File(baseDir, "warehouse");
-
-      final String dbDir = dbDirFile.getAbsolutePath();
-      final String whDir = whDirFile.getAbsolutePath();
 
-      instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
-      instance.generateTestData();
-    }
+  private final File baseDir;
 
-    return instance;
-  }
+  private final String warehouseDir;
 
-  private HiveTestDataGenerator(final String dbDir, final String whDir, final BaseDirTestWatcher dirTestWatcher) {
-    this.dbDir = dbDir;
-    this.whDir = whDir;
+  public HiveTestDataGenerator(BaseDirTestWatcher dirTestWatcher, File baseDir, String warehouseDir) {
     this.dirTestWatcher = dirTestWatcher;
-
-    config = new HashMap<>();
-    config.put(ConfVars.METASTOREURIS.toString(), "");
-    config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
-    config.put("hive.metastore.warehouse.dir", whDir);
-    config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
-  }
-
-  /**
-   * Add Hive test storage plugin to the given plugin registry.
-   *
-   * @param pluginRegistry storage plugin registry
-   * @throws Exception in case if unable to update Hive storage plugin
-   */
-  public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) throws Exception {
-    HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
-    pluginConfig.setEnabled(true);
-
-    pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, pluginConfig, true);
-  }
-
-  /**
-   * Update the current HiveStoragePlugin in given plugin registry with given <i>configOverride</i>.
-   *
-   * @param pluginRegistry storage plugin registry
-   * @param configOverride config properties to be overridden
-   * @throws DrillException if fails to update or no Hive plugin currently exists in given plugin registry.
-   */
-  public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, Map<String, String> configOverride)
-      throws DrillException {
-    HiveStoragePlugin storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(HIVE_TEST_PLUGIN_NAME);
-    if (storagePlugin == null) {
-      throw new DrillException(
-          "Hive test storage plugin doesn't exist. Add a plugin using addHiveTestPlugin()");
-    }
-
-    HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
-    newPluginConfig.getConfigProps().putAll(configOverride);
-
-    pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true);
-  }
-
-  /**
-   * Delete the Hive test plugin from registry.
-   */
-  public void deleteHiveTestPlugin(final StoragePluginRegistry pluginRegistry) {
-    pluginRegistry.deletePlugin(HIVE_TEST_PLUGIN_NAME);
+    this.baseDir = baseDir;
+    this.warehouseDir = warehouseDir;
   }
 
-  public static File createFileWithPermissions(File baseDir, String name) {
-    Set<PosixFilePermission> perms = Sets.newHashSet(PosixFilePermission.values());
-    File dir = new File(baseDir, name);
-    dir.mkdirs();
-
+  public void generateData(Driver hiveDriver) {
     try {
-      Files.setPosixFilePermissions(dir.toPath(), perms);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+      generateDataInternal(hiveDriver);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception was thrown while generating test data", e);
     }
-
-    return dir;
   }
 
-  private void generateTestData() throws Exception {
-    HiveConf conf = new HiveConf(SessionState.class);
-
-    File scratchDir = createFileWithPermissions(baseDir, "scratch_dir");
-    File localScratchDir = createFileWithPermissions(baseDir, "local_scratch_dir");
-    File part1Dir = createFileWithPermissions(baseDir, "part1");
-
-    conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
-    conf.set("hive.metastore.warehouse.dir", whDir);
-    conf.set("mapred.job.tracker", "local");
-    conf.set(ConfVars.SCRATCHDIR.varname,  scratchDir.getAbsolutePath());
-    conf.set(ConfVars.LOCALSCRATCHDIR.varname, localScratchDir.getAbsolutePath());
-    conf.set(ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
-    conf.set(ConfVars.METASTORE_AUTO_CREATE_ALL.varname, "true");
-    conf.set(ConfVars.METASTORE_SCHEMA_VERIFICATION.varname, "false");
-    conf.set(ConfVars.HIVE_CBO_ENABLED.varname, "false");
-
-    SessionState ss = new SessionState(conf);
-    SessionState.start(ss);
-    Driver hiveDriver = new Driver(conf);
-
+  private void generateDataInternal(Driver hiveDriver) throws Exception {
+    File part1Dir = createDirWithPosixPermissions(baseDir, "part1");
     // generate (key, value) test data
     String testDataFile = generateTestDataFile();
 
@@ -199,7 +93,7 @@ public class HiveTestDataGenerator {
     // create a table with no data
     executeQuery(hiveDriver, "CREATE TABLE IF NOT EXISTS empty_table(a INT, b STRING)");
     // delete the table location of empty table
-    File emptyTableLocation = new File(whDir, "empty_table");
+    File emptyTableLocation = new File(warehouseDir, "empty_table");
     if (emptyTableLocation.exists()) {
       FileUtils.forceDelete(emptyTableLocation);
     }
@@ -504,7 +398,7 @@ public class HiveTestDataGenerator {
         "'org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler'");
     // Insert fails if the table directory already exists for tables with DefaultStorageHandlers. Its a known
     // issue in Hive. So delete the table directory created as part of the CREATE TABLE
-    FileUtils.deleteQuietly(new File(whDir, "kv_sh"));
+    FileUtils.deleteQuietly(new File(warehouseDir, "kv_sh"));
     //executeQuery(hiveDriver, "INSERT OVERWRITE TABLE kv_sh SELECT * FROM kv");
 
     // Create text tables with skip header and footer table property
@@ -552,7 +446,6 @@ public class HiveTestDataGenerator {
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW view_over_hive_view AS SELECT * FROM hive_view WHERE key BETWEEN 2 AND 3");
     executeQuery(hiveDriver, "CREATE OR REPLACE VIEW db1.two_table_view AS SELECT COUNT(dk.key) dk_key_count FROM db1.avro dk " +
         "INNER JOIN kv ON kv.key = dk.key");
-    ss.close();
   }
 
   private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
index e9f7aab..8569d2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
@@ -21,6 +21,8 @@ import org.apache.drill.PlanTestBase;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.drill.test.TestBuilder;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -209,4 +211,40 @@ public class TestBaseViewSupport extends PlanTestBase {
       dropViewHelper(finalSchema, viewName, finalSchema);
     }
   }
+
+  /**
+   * Convenient method for defining baselineColumns to be passed into
+   * view helper methods.
+   *
+   * @param names column names varargs
+   * @return column names array
+   */
+  protected static String[] baselineColumns(String... names) {
+    return names;
+  }
+
+  /**
+   * Convenient method for grouping of expected rows into
+   * list of Object arrays, where each array represents concrete
+   * row. This method is used for defining baselineValues and passing it to
+   * view helper methods.
+   *
+   * @param rows rows in form of Object[] varargs
+   * @return list of rows
+   */
+  protected static List<Object[]> baselineRows(Object[]... rows) {
+    return Collections.unmodifiableList(Arrays.asList(rows));
+  }
+
+  /**
+   * Helper method for conversion of Object varargs into
+   * array of objects. Used for passing rows into
+   * {@link TestBaseViewSupport#baselineRows(Object[]...)}
+   *
+   * @param columns Object varargs
+   * @return array of passed objects
+   */
+  protected static Object[] row(Object... columns) {
+    return columns;
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
index a2eefcc..b8d5495 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.categories.UnlikelyTest;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -104,9 +103,10 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT * FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[] { "region_id", "sales_city", "sales_state_province", "sales_district", "sales_region",
-            "sales_country", "sales_district_id" },
-        ImmutableList.of(new Object[] { 0L, "None", "None", "No District", "No Region", "No Country", 0L })
+        baselineColumns("region_id", "sales_city", "sales_state_province", "sales_district",
+                        "sales_region", "sales_country", "sales_district_id"),
+        baselineRows(row(0L, "None", "None", "No District",
+                        "No Region", "No Country", 0L))
     );
   }
 
@@ -117,11 +117,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "region_id", "sales_city" },
-        ImmutableList.of(
-            new Object[] { 0L, "None" },
-            new Object[] { 1L, "San Francisco" }
-        )
+        baselineColumns("region_id", "sales_city"),
+        baselineRows(row(0L, "None"), row(1L, "San Francisco"))
     );
   }
 
@@ -132,11 +129,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         "(regionid, salescity)",
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "regionid", "salescity" },
-        ImmutableList.of(
-            new Object[] { 0L, "None" },
-            new Object[] { 1L, "San Francisco" }
-        )
+        baselineColumns("regionid", "salescity"),
+        baselineRows(row(0L, "None"), row(1L, "San Francisco"))
     );
   }
 
@@ -147,11 +141,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT * FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT region_id, sales_city FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "region_id", "sales_city" },
-        ImmutableList.of(
-            new Object[] { 0L, "None" },
-            new Object[] { 1L, "San Francisco" }
-        )
+        baselineColumns("region_id", "sales_city"),
+        baselineRows(row(0L, "None"), row(1L, "San Francisco"))
     );
   }
 
@@ -162,11 +153,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT region_id, sales_city FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "region_id", "sales_city" },
-        ImmutableList.of(
-            new Object[] { 0L, "None" },
-            new Object[] { 1L, "San Francisco" }
-        )
+        baselineColumns("region_id", "sales_city"),
+        baselineRows(row(0L, "None"), row(1L, "San Francisco"))
     );
   }
 
@@ -177,11 +165,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id`",
         "SELECT sales_city FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "sales_city" },
-        ImmutableList.of(
-            new Object[] { "None" },
-            new Object[] { "San Francisco" }
-        )
+        baselineColumns("sales_city"),
+        baselineRows(row("None"), row("San Francisco"))
     );
   }
 
@@ -192,11 +177,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         "(regionid, salescity)",
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id` LIMIT 2",
         "SELECT regionid, salescity FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[] { "regionid", "salescity" },
-        ImmutableList.of(
-            new Object[] { 0L, "None" },
-            new Object[] { 1L, "San Francisco" }
-        )
+        baselineColumns("regionid", "salescity"),
+        baselineRows(row(0L, "None"), row(1L, "San Francisco"))
     );
   }
 
@@ -207,11 +189,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         "(regionid, salescity)",
         "SELECT region_id, sales_city FROM cp.`region.json` ORDER BY `region_id` DESC",
         "SELECT regionid FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[]{"regionid"},
-        ImmutableList.of(
-            new Object[]{109L},
-            new Object[]{108L}
-        )
+        baselineColumns("regionid"),
+        baselineRows(row(109L), row(108L))
     );
   }
 
@@ -223,11 +202,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         "SELECT region_id FROM cp.`region.json` UNION SELECT employee_id FROM cp.`employee.json`",
         "SELECT regionid FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 2",
-        new String[]{"regionid"},
-        ImmutableList.of(
-            new Object[]{110L},
-            new Object[]{108L}
-        )
+        baselineColumns("regionid"),
+        baselineRows(row(110L), row(108L))
     );
   }
 
@@ -245,8 +221,8 @@ public class TestViewSupport extends TestBaseViewSupport {
 
       queryViewHelper(
           String.format("SELECT region_id FROM %s.`%s` LIMIT 1", DFS_TMP_SCHEMA, outerView),
-          new String[] { "region_id" },
-          ImmutableList.of(new Object[] { 0L })
+          baselineColumns("region_id"),
+          baselineRows(row(0L))
       );
     } finally {
       dropViewHelper(DFS_TMP_SCHEMA, outerView, DFS_TMP_SCHEMA);
@@ -269,10 +245,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         null,
         viewDef,
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME LIMIT 1",
-        new String[]{"n_nationkey", "n_name", "n_regionkey", "n_comment"},
-        ImmutableList.of(
-            new Object[]{0, "ALGERIA", 0, " haggle. carefully final deposits detect slyly agai"}
-        )
+        baselineColumns("n_nationkey", "n_name", "n_regionkey", "n_comment"),
+        baselineRows(row(0, "ALGERIA", 0, " haggle. carefully final deposits detect slyly agai"))
     );
   }
 
@@ -304,8 +278,8 @@ public class TestViewSupport extends TestBaseViewSupport {
 
       // Make sure the new view created returns the data expected.
       queryViewHelper(String.format("SELECT * FROM %s.`%s` LIMIT 1", DFS_TMP_SCHEMA, viewName),
-          new String[]{"sales_state_province"},
-          ImmutableList.of(new Object[]{"None"})
+          baselineColumns("sales_state_province"),
+          baselineRows(row("None"))
       );
     } finally {
       dropViewHelper(DFS_TMP_SCHEMA, viewName, DFS_TMP_SCHEMA);
@@ -361,8 +335,8 @@ public class TestViewSupport extends TestBaseViewSupport {
 
       // Make sure the view created returns the data expected.
       queryViewHelper(String.format("SELECT * FROM %s.`%s` LIMIT 1", DFS_TMP_SCHEMA, viewName),
-        new String[]{"region_id", "sales_city"},
-        ImmutableList.of(new Object[]{0L, "None"})
+          baselineColumns("region_id", "sales_city"),
+          baselineRows(row(0L, "None"))
       );
     } finally {
       dropViewHelper(DFS_TMP_SCHEMA, viewName, DFS_TMP_SCHEMA);
@@ -382,8 +356,8 @@ public class TestViewSupport extends TestBaseViewSupport {
 
       // Make sure the view created returns the data expected.
       queryViewHelper(String.format("SELECT * FROM %s.`%s` LIMIT 1", DFS_TMP_SCHEMA, viewName),
-        new String[]{"region_id", "sales_city"},
-        ImmutableList.of(new Object[]{0L, "None"})
+          baselineColumns("region_id", "sales_city"),
+          baselineRows(row(0L, "None"))
       );
     } finally {
       dropViewHelper(DFS_TMP_SCHEMA, viewName, DFS_TMP_SCHEMA);
@@ -484,8 +458,8 @@ public class TestViewSupport extends TestBaseViewSupport {
       createViewHelper("tmp", viewName, DFS_TMP_SCHEMA, null,
           "SELECT CAST(`employee_id` AS INTEGER) AS `employeeid`\n" + "FROM `cp`.`employee.json`");
 
-      final String[] baselineColumns = new String[] { "employeeid" };
-      final List<Object[]> baselineValues = ImmutableList.of(new Object[] { 1156 });
+      final String[] baselineColumns = baselineColumns("employeeid");
+      final List<Object[]> baselineValues = baselineRows(row(1156));
 
       // Query view from current schema "dfs" by referring to the view using "tmp.viewName"
       queryViewHelper(
@@ -521,8 +495,8 @@ public class TestViewSupport extends TestBaseViewSupport {
       // Create a view with full schema identifier and refer the "region.json" as without schema.
       createViewHelper(DFS_TMP_SCHEMA, viewName, DFS_TMP_SCHEMA, null, "SELECT region_id, sales_city FROM `region.json`");
 
-      final String[] baselineColumns = new String[] { "region_id", "sales_city" };
-      final List<Object[]> baselineValues = ImmutableList.of(new Object[]{109L, "Santa Fe"});
+      final String[] baselineColumns = baselineColumns("region_id", "sales_city");
+      final List<Object[]> baselineValues = baselineRows(row(109L, "Santa Fe"));
 
       // Query the view
       queryViewHelper(
@@ -635,10 +609,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         "(regionid1, regionid2)",
         "SELECT region_id, region_id FROM cp.`region.json` LIMIT 1",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME",
-        new String[]{"regionid1", "regionid2"},
-        ImmutableList.of(
-            new Object[]{0L, 0L}
-        )
+        baselineColumns("regionid1", "regionid2"),
+        baselineRows(row(0L, 0L))
     );
   }
 
@@ -651,10 +623,8 @@ public class TestViewSupport extends TestBaseViewSupport {
         "SELECT t1.region_id, t2.region_id FROM cp.`region.json` t1 JOIN cp.`region.json` t2 " +
             "ON t1.region_id = t2.region_id LIMIT 1",
         "SELECT * FROM TEST_SCHEMA.TEST_VIEW_NAME",
-        new String[]{"regionid1", "regionid2"},
-        ImmutableList.of(
-            new Object[]{0L, 0L}
-        )
+        baselineColumns("regionid1", "regionid2"),
+        baselineRows(row(0L, 0L))
     );
   }