You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/02/08 07:44:18 UTC

[drill] branch master updated: DRILL-8290: Early exit from recursive file listing for LIMIT 0 queries (#2636)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fe6b61372 DRILL-8290: Early exit from recursive file listing for LIMIT 0 queries (#2636)
8fe6b61372 is described below

commit 8fe6b6137293cc2434afa9628868801928285b5e
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Wed Feb 8 09:44:11 2023 +0200

    DRILL-8290: Early exit from recursive file listing for LIMIT 0 queries (#2636)
    
    * New validation logic: error when no files are found and no workspace default format is set.
    * When auto disabling a plugin override exception error type to PLUGIN.
    * Set editorconfig continuation indent to 4.
---
 .editorconfig                                      |   2 +-
 .../drill/common/exceptions/UserException.java     |  15 +-
 .../org/apache/calcite/jdbc/DynamicRootSchema.java |  23 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |  12 +
 .../planner/logical/SelectionBasedTableScan.java   |   4 +-
 .../planner/sql/handlers/DefaultSqlHandler.java    |  12 +-
 .../planner/sql/parser/FindLimit0SqlVisitor.java   |  87 +++
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../apache/drill/exec/store/dfs/FileSelection.java |  14 +
 .../exec/store/dfs/WorkspaceSchemaFactory.java     |  37 +-
 .../compression/DrillCompressionCodecFactory.java  |  15 +-
 .../store/parquet/metadata/MetadataPathUtils.java  |  23 +
 .../drill/exec/util/DrillFileSystemUtil.java       |  18 +
 .../org/apache/drill/exec/util/FileSystemUtil.java |  45 ++
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../test/java/org/apache/drill/PlanTestBase.java   |  21 -
 .../drill/TestDirScanToValuesConversion.java       |  11 +-
 .../org/apache/drill/TestSelectWithOption.java     |  12 +-
 .../test/java/org/apache/drill/TestUnionAll.java   |  27 +-
 .../java/org/apache/drill/TestUnionDistinct.java   |  86 ++-
 .../org/apache/drill/exec/TestEmptyInputSql.java   |  44 +-
 .../physical/impl/join/TestJoinEmptyDirTable.java  |  56 +-
 .../exec/store/parquet/TestParquetGroupScan.java   |  79 +-
 .../store/parquet/TestParquetMetadataCache.java    | 793 +++++++++++++--------
 .../java/org/apache/drill/test/BaseTestQuery.java  |   4 +-
 25 files changed, 1020 insertions(+), 422 deletions(-)

diff --git a/.editorconfig b/.editorconfig
index 22763960b9..c1d1593b8b 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -41,7 +41,7 @@ ij_any_spaces_around_logical_operators = true
 ij_any_spaces_around_multiplicative_operators = true
 ij_any_spaces_around_relational_operators = true
 ij_any_spaces_around_shift_operators = true
-ij_continuation_indent_size = 2
+ij_continuation_indent_size = 4
 ij_java_if_brace_force = always
 ij_java_indent_case_from_switch = false
 ij_java_space_after_colon = true
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 437dacf267..550d2899cb 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -443,10 +443,10 @@ public class UserException extends DrillRuntimeException {
   public static class Builder {
 
     private final Throwable cause;
-    private final DrillPBError.ErrorType errorType;
     private final UserException uex;
     private final UserExceptionContext context;
 
+    private DrillPBError.ErrorType errorType;
     private String message;
 
     /**
@@ -480,6 +480,19 @@ public class UserException extends DrillRuntimeException {
       message = uex.getOriginalMessage();
     }
 
+    /**
+     * Sets the error type. This method is normally not needed because of
+     * the per-error-type static initialiser methods above but it may be
+     * used in conjunction with {@link rebuild()} to clone an existing
+     * UserException modified to use a different error type.
+     * @param errorType the error type to set.
+     * @return this builder.
+     */
+    public Builder errorType(DrillPBError.ErrorType errorType) {
+      this.errorType = errorType;
+      return this;
+    }
+
     /**
      * sets or replaces the error message.
      * <p>This will be ignored if this builder is wrapping a user exception
diff --git a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
index 098191dcb3..93e6daa064 100644
--- a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.alias.AliasRegistryProvider;
 import org.apache.drill.exec.planner.sql.SchemaUtilities;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePlugin;
@@ -191,12 +192,22 @@ public class DynamicRootSchema extends DynamicSchema {
     } catch (Exception ex) {
       logger.error("Failed to load schema for {}", schemaName, ex);
       // We can't proceed further without a schema, throw a runtime exception.
-      UserException.Builder exceptBuilder =
-          UserException
-              .pluginError(ex)
-              .message("Failed to load schema for schema %s", schemaName)
-              .addContext("%s: %s", ex.getClass().getName(), ex.getMessage())
-              .addContext(UserExceptionUtils.getUserHint(ex)); //Provide hint if it exists
+      // The UserException thrown from here must have an error type of PLUGIN
+      // because that will be used to decide whether planning should be retried
+      // by {@link DrillSqlWorker}.
+      //
+      // If the exception we've caught is already a UserException we have to
+      // jump through a hoop to ensure that the one that we throw from
+      // here has an error type of PLUGIN instead of inheriting that of the
+      // caught UserException. See the logic in the UserException builder
+      // methods.
+      UserException.Builder exceptBuilder = ex instanceof UserException
+        ? ((UserException) ex).rebuild().errorType(DrillPBError.ErrorType.PLUGIN)
+        : UserException
+            .pluginError(ex)
+            .message("Failed to load schema for schema %s", schemaName)
+            .addContext("%s: %s", ex.getClass().getName(), ex.getMessage())
+            .addContext(UserExceptionUtils.getUserHint(ex)); //Provide hint if it exists
 
       if (schemaConfig.getOption(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE).bool_val) {
         String msg = String.format(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c73168a768..696aa804a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -708,6 +708,18 @@ public final class ExecConstants {
   public static final BooleanValidator LATE_LIMIT0_OPT = new BooleanValidator(LATE_LIMIT0_OPT_KEY,
       new OptionDescription("Enables Drill to determine data types as Drill scans data. This optimization is used when the query planner cannot infer types of columns during validation (prior to scanning). Drill exits and terminates the query immediately after resolving the types. When this optimization is applied, the query plan contains a LIMIT (0) above every SCAN, with an optional PROJECT in between. Default is true. (Drill 1.14+)"));
 
+  // Intended only for the internal use in QUERY scope of flagging the presence
+  // of a LIMIT 0 in the root portion of a query for an early stage (query
+  // validation) optimisation that exits early from the recursive listing of
+  // files matching the paths present in the query's FROM clauses. The option
+  // exec.query.max_rows can unfortunately not be reused here because a value
+  // of 0 means "no maximum" for it.
+  public static final String FILE_LISTING_LIMIT0_OPT_KEY = "planner.enable_file_listing_limit0_optimization";
+  public static final BooleanValidator FILE_LISTING_LIMIT0_OPT = new BooleanValidator(
+      FILE_LISTING_LIMIT0_OPT_KEY,
+      new OptionDescription("For internal use. Do not change.")
+  );
+
   public static final String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation";
   public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY,
       new OptionDescription("Toggles the state of memory estimation and re-planning of the query. When enabled, Drill conservatively estimates memory requirements and typically excludes these operators from the plan and negatively impacts performance."));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/SelectionBasedTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/SelectionBasedTableScan.java
index 4a77039707..566242532d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/SelectionBasedTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/SelectionBasedTableScan.java
@@ -31,7 +31,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * This class extends from {@link TableScan}. It puts the file selection string into it's digest.
+ * This class extends from {@link TableScan}. It puts the file selection string into its digest.
  * When directory-based partition pruning applied, file selection could be different for the same
  * table.
  */
@@ -49,7 +49,7 @@ public class SelectionBasedTableScan extends TableScan {
     return new SelectionBasedTableScan(getCluster(), traitSet, table, digestFromSelection);
   }
 
-  /** Creates an DirPrunedTableScan. */
+  /** Creates a SelectionBasedTableScan. */
   public static TableScan create(RelOptCluster cluster,
     RelOptTable relOptTable, String digestFromSelection) {
     Table table = relOptTable.unwrap(Table.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index df1fd6b3e9..d3d7ab9c98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -99,6 +99,7 @@ import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
 import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
 import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
 import org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor;
+import org.apache.drill.exec.planner.sql.parser.FindLimit0SqlVisitor;
 import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
@@ -645,10 +646,19 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       }
       return null;
     }
-
   }
 
   protected Pair<SqlNode, RelDataType> validateNode(SqlNode sqlNode) throws ValidationException, RelConversionException, ForemanSetupException {
+    // Check for a LIMIT 0 in the root portion of the query before validation
+    // because validation of the query's FROM clauses will already trigger
+    // the recursive listing files to which FILE_LISTING_LIMIT0_OPT is meant
+    // to apply.
+    boolean rootSelectLimit0 = FindLimit0SqlVisitor.containsLimit0(sqlNode);
+    context.getOptions().setLocalOption(
+      ExecConstants.FILE_LISTING_LIMIT0_OPT_KEY,
+      rootSelectLimit0
+    );
+
     final SqlNode sqlNodeValidated = config.getConverter().validate(sqlNode);
     final Pair<SqlNode, RelDataType> typedSqlNode = new Pair<>(sqlNodeValidated, config.getConverter().getOutputType(
         sqlNodeValidated));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/FindLimit0SqlVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/FindLimit0SqlVisitor.java
new file mode 100644
index 0000000000..950d9417f5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/FindLimit0SqlVisitor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.util.SqlShuttle;
+
+import java.util.EnumSet;
+
+/**
+ * A visitor that is very similar to {@link FindLimit0SqlVisitor} in that it looks for a LIMIT 0
+ * in the root portion of the query tree for the sake of enabling optimisations but that is
+ * different in the following aspects.
+ *
+ *  1. This visitor enables the
+ *  {@link org.apache.drill.exec.ExecConstants#FILE_LISTING_LIMIT0_OPT} optimisation which takes
+ *  effect during query validation, before the query has been converted to a RelNode tree.
+ *  2. This visitor is less thorough about discovering usable LIMIT 0 nodes because of the
+ *  preceding point. For example, it does not even try to make use of LIMIT 0s that are present
+ *  in CTEs (see SqlKind.WITH_ITEM below). Since the real targets of LIMIT 0 optimisations are
+ *  schema probing queries which almost always have their LIMIT 0 on the outermost SELECT, this
+ *  visitor should nevertheless do the job sufficiently.
+ *  3. This visitor is interested in whether any scanned input data is needed for the query's
+ *  results, rather than whether there are zero results e.g. aggregates like SUM return a single
+ *  (null) result even when they have zero inputs.
+ *
+ */
+public class FindLimit0SqlVisitor extends SqlShuttle {
+  private static final EnumSet<SqlKind> SEARCH_TERMINATING_NODES = EnumSet.of(
+      SqlKind.JOIN,
+      SqlKind.UNION,
+      SqlKind.EXCEPT,
+      SqlKind.WITH_ITEM
+  );
+
+  private boolean rootContainsLimit0;
+
+  /**
+   * Do a non-exhaustive check of whether the root portion of the SQL node tree contains LIMIT(0)
+   *
+   * @param sql SQL node tree
+   * @return true if the root portion of the tree contains LIMIT(0)
+   */
+  public static boolean containsLimit0(final SqlNode sql) {
+    FindLimit0SqlVisitor visitor = new FindLimit0SqlVisitor();
+    sql.accept(visitor);
+    return visitor.rootContainsLimit0;
+  }
+
+  @Override
+  public SqlNode visit(SqlCall call) {
+    SqlKind kind = call.getKind();
+    if (SEARCH_TERMINATING_NODES.contains(kind)) {
+      return call;
+    }
+    if (kind == SqlKind.ORDER_BY) {
+      SqlOrderBy orderBy = (SqlOrderBy) call;
+      SqlNumericLiteral limitLiteral = (SqlNumericLiteral) orderBy.fetch;
+
+      if (limitLiteral != null && limitLiteral.longValue(true) == 0) {
+        rootContainsLimit0 = true;
+        return call;
+      }
+    }
+    // Continue down the tree.
+    return super.visit(call);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e3421b4dab..64658ae871 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -238,6 +238,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
       new OptionDefinition(ExecConstants.LATE_LIMIT0_OPT),
+      new OptionDefinition(ExecConstants.FILE_LISTING_LIMIT0_OPT),
       new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION),
       new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE),
       new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 6563c7803a..e13237ea2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -195,6 +195,20 @@ public class FileSelection implements DrillTableSelection {
     return fileSel;
   }
 
+  public FileSelection selectAnyFile(DrillFileSystem fs) throws IOException {
+    List<FileStatus> statuses = getStatuses(fs);
+    List<FileStatus> anyFile = Lists.newArrayList();
+
+    for (FileStatus status : statuses) {
+      anyFile.addAll(DrillFileSystemUtil.anyFile(fs, status.getPath()));
+
+      if (anyFile.size() > 0) {
+        break;
+      }
+    }
+    return create(anyFile, null, selectionRoot);
+  }
+
   public FileStatus getFirstPath(DrillFileSystem fs) throws IOException {
     return getStatuses(fs).get(0);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index d4558c3eb1..63d73a33ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -208,7 +208,7 @@ public class WorkspaceSchemaFactory {
     if (!accessible(fs)) {
       return null;
     }
-    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig, fs);
+    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig, fs, config);
   }
 
   public String getSchemaName() {
@@ -299,12 +299,20 @@ public class WorkspaceSchemaFactory {
     private final DrillFileSystem fs;
     // Drill Process User file-system
     private final DrillFileSystem dpsFs;
-
-    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) {
+    private final WorkspaceConfig wsConfig;
+
+    public WorkspaceSchema(
+      List<String> parentSchemaPath,
+      String wsName,
+      SchemaConfig schemaConfig,
+      DrillFileSystem fs,
+      WorkspaceConfig config
+    ) {
       super(parentSchemaPath, wsName);
       this.schemaConfig = schemaConfig;
       this.fs = fs;
       this.dpsFs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fsConf);
+      this.wsConfig = config;
     }
 
     DrillTable getDrillTable(TableInstance key) {
@@ -672,8 +680,18 @@ public class WorkspaceSchemaFactory {
      * @param hasDirectories flag that indicates if given file selection has directories
      * @return revisited file selection
      */
-    private FileSelection detectEmptySelection(FileSelection fileSelection, boolean hasDirectories) throws IOException {
-      FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
+    private FileSelection expandSelection(FileSelection fileSelection, boolean hasDirectories) throws IOException {
+      FileSelection newSelection;
+
+      if (hasDirectories) {
+        newSelection = schemaConfig.getOption(ExecConstants.FILE_LISTING_LIMIT0_OPT_KEY).bool_val
+            ? fileSelection.selectAnyFile(getFS())
+            : fileSelection.minusDirectories(getFS());
+      } else {
+        // We don't bother with single-file optimisation in this case
+        newSelection = fileSelection;
+      }
+
       if (newSelection == null) {
         // empty directory / selection means that this is the empty and schemaless table
         fileSelection.setEmptyDirectoryStatus();
@@ -893,8 +911,15 @@ public class WorkspaceSchemaFactory {
           }
         }
 
-        newSelection = detectEmptySelection(fileSelection, hasDirectories);
+        newSelection = expandSelection(fileSelection, hasDirectories);
         if (newSelection.isEmptyDirectory()) {
+          if (wsConfig.getDefaultInputFormat() == null) {
+            throw UserException.validationError()
+                .message("No files were found and no default format is set on the queried workspace.")
+                .addContext("workspace", Joiner.on(".").join(getSchemaPath()))
+                .addContext("table", key.sig.getName())
+                .build(logger);
+          }
           return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
         }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
index f971b33237..97d879e989 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
@@ -17,10 +17,9 @@
  */
 package org.apache.drill.exec.store.parquet.compression;
 
-import java.util.Arrays;
 import java.util.Deque;
+import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
@@ -51,13 +50,11 @@ public class DrillCompressionCodecFactory implements CompressionCodecFactory {
   private static final Logger logger = LoggerFactory.getLogger(DrillCompressionCodecFactory.class);
 
   // The set of codecs to be handled by aircompressor
-  private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
-      Arrays.asList(
-        CompressionCodecName.LZ4,
-        CompressionCodecName.LZO,
-        CompressionCodecName.SNAPPY,
-        CompressionCodecName.ZSTD
-      )
+  private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = EnumSet.of(
+      CompressionCodecName.LZ4,
+      CompressionCodecName.LZO,
+      CompressionCodecName.SNAPPY,
+      CompressionCodecName.ZSTD
   );
 
   // pool of reusable thread-safe aircompressor compressors (parquet-mr's factory has its own)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
index 2cad8e1681..2efc2ec888 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
@@ -17,10 +17,14 @@
  */
 package org.apache.drill.exec.store.parquet.metadata;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
 
 import java.util.List;
+import java.io.File;
+import java.nio.file.Files;
 import java.util.ArrayList;
 
 import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
@@ -126,4 +130,23 @@ public class MetadataPathUtils {
     return relativeFilePath;
   }
 
+  /**
+   * Helper method for checking the metadata file existence
+   *
+   * @param basePath base path containing tables to be checked for metadata files
+   * @param table table name or table path
+   */
+  @VisibleForTesting
+  public static boolean checkForMetadataFile(String basePath, String table) {
+    for (String filename: Metadata.CURRENT_METADATA_FILENAMES) {
+      File metaFile = table.startsWith(basePath)
+        ? FileUtils.getFile(table, filename)
+        : FileUtils.getFile(basePath, table, filename);
+
+      if (!Files.exists(metaFile.toPath())) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
index bfb83e07c6..6806aa2460 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
@@ -102,6 +102,24 @@ public class DrillFileSystemUtil {
     return FileSystemUtil.listFilesSafe(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
   }
 
+  /**
+   * Returns the status of any file present in given path applying custom
+   * filters if present.  Files and nested directories that start with dot or
+   * underscore are skipped.  Will also include files from nested directories.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of at most one matching file status
+   */
+  public static List<FileStatus> anyFile(
+    final FileSystem fs,
+    Path path,
+    PathFilter... filters
+  ) throws IOException {
+    return FileSystemUtil.anyFile(fs, path, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
 
   /**
    * Returns statuses of all directories and files present in given path applying custom filters if present.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index d0b13de2c9..114d89e945 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -127,6 +127,18 @@ public class FileSystemUtil {
     }
   }
 
+  /**
+   * Returns the statuses of at least one file present in given path applying
+   * custom filters if present.
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param filters list of custom filters (optional)
+   * @return list of at most one matching file status
+   */
+  public static List<FileStatus> anyFile(FileSystem fs, Path path, PathFilter... filters) throws IOException {
+    return anyRecursive(fs, path, Scope.FILES, mergeFilters(filters));
+  }
+
   /**
    * Returns statuses of all directories and files present in given path applying custom filters if present.
    * Will also include nested directories and their files if recursive flag is set to true.
@@ -275,6 +287,39 @@ public class FileSystemUtil {
     }
   }
 
+  /**
+   * Searches depth first for at least one file status recursively based on
+   * given file system objects {@link Scope}. A depth first search is expected
+   * to be efficient most of the time given that data files are typically
+   * stored at the leaves of a directory tree. Does not use multithreading
+   * due to its early exit nature.
+   *
+   * @param fs file system
+   * @param path path to file or directory
+   * @param scope file system objects scope
+   * @param filter filter to be applied
+   * @return list containing at most one file status
+   */
+  private static List<FileStatus> anyRecursive(
+    FileSystem fs,
+    Path path,
+    Scope scope,
+    PathFilter filter
+  ) throws IOException {
+    for (FileStatus status : fs.listStatus(path, filter)) {
+      if (isStatusApplicable(status, scope)) {
+        return Collections.singletonList(status);
+      }
+      if (status.isDirectory()) {
+        List<FileStatus> fileList = anyRecursive(fs, status.getPath(), scope, filter);
+        if (fileList.size() > 0) {
+          return fileList;
+        }
+      }
+    }
+    return Collections.emptyList();
+  }
+
   /**
    * Checks if file status is applicable based on file system object {@link Scope}.
    *
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4e7ef0abb3..32203ca7a7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -642,6 +642,7 @@ drill.exec.options: {
     planner.enable_join_optimization: true,
     planner.enable_limit0_optimization: true,
     planner.enable_limit0_on_scan: true,
+    planner.enable_file_listing_limit0_optimization: false,
     planner.enable_mergejoin: true,
     planner.enable_multiphase_agg: true,
     planner.enable_mux_exchange: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 5353032381..a71c6b7094 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -35,7 +35,6 @@ import org.apache.drill.test.QueryTestUtil;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Stack;
 import java.util.regex.Matcher;
@@ -494,24 +493,4 @@ public class PlanTestBase extends BaseTestQuery {
     return builder.toString();
   }
 
-  /**
-   * Create a temp metadata directory to query the metadata summary cache file
-   * @param table table name or table path
-   */
-  public static void createMetadataDir(String table) throws IOException {
-    final String tmpDir;
-    try {
-      tmpDir = dirTestWatcher.getRootDir().getCanonicalPath();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    File metadataDir = dirTestWatcher.makeRootSubDir(Paths.get(tmpDir, table, "metadataDir"));
-    File metaFile, newFile;
-    metaFile = table.startsWith(tmpDir) ? FileUtils.getFile(table, Metadata.METADATA_SUMMARY_FILENAME)
-            : FileUtils.getFile(tmpDir, table, Metadata.METADATA_SUMMARY_FILENAME);
-    File tablefile = new File(tmpDir, table);
-    newFile = new File(tablefile, "summary_meta.json");
-    FileUtils.copyFile(metaFile, newFile);
-    FileUtils.copyFileToDirectory(newFile, metadataDir);
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java b/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java
index 1aeeb5ffb0..c30888ce5e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java
@@ -18,10 +18,13 @@
 package org.apache.drill;
 
 import org.apache.drill.categories.PlannerTest;
+import org.apache.drill.exec.store.parquet.metadata.MetadataPathUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertTrue;
+
 import java.nio.file.Paths;
 
 @Category({PlannerTest.class})
@@ -52,7 +55,13 @@ public class TestDirScanToValuesConversion extends PlanTestBase {
   @Test
   public void testDirScanToValuesConversionWithMetadataCache() throws Exception {
     test("refresh table metadata dfs.`%s`", TABLE_WITH_METADATA);
-    checkForMetadataFile(TABLE_WITH_METADATA);
+    assertTrue(
+      String.format("There is no metadata cache file for the %s table", TABLE_WITH_METADATA),
+      MetadataPathUtils.checkForMetadataFile(
+        dirTestWatcher.getRootDir().getCanonicalPath(),
+        TABLE_WITH_METADATA
+      )
+    );
     String query = String.format("select distinct dir0, dir1 from dfs.`%s`", TABLE_WITH_METADATA);
     PlanTestBase.testPlanMatchingPatterns(query, new String[]{"Values\\(tuples="}, null);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index 10bb0ec395..7fe589b9d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -31,6 +31,7 @@ import org.apache.drill.categories.SqlTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
@@ -49,6 +50,15 @@ public class TestSelectWithOption extends ClusterTest {
   @BeforeClass
   public static void setUp() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
   }
 
   private File genCSVFile(String name, String... rows) throws IOException {
@@ -314,7 +324,7 @@ public class TestSelectWithOption extends ClusterTest {
     String tableName = "emptyTable";
     dirTestWatcher.makeTestTmpSubDir(Paths.get(tableName));
     testBuilder()
-      .sqlQuery("select * from table(dfs.tmp.`%s`(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
+      .sqlQuery("select * from table(dfs.tmp_default_format.`%s`(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
       .expectsEmptyResultSet()
       .go();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 5d0864ea25..0121bdf0f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.apache.drill.test.ClusterFixture;
@@ -56,6 +57,15 @@ public class TestUnionAll extends ClusterTest {
     startCluster(ClusterFixture.builder(dirTestWatcher));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "parquet"));
     dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
   }
 
   @Test  // Simple Union-All over two scans
@@ -1264,7 +1274,7 @@ public class TestUnionAll extends ClusterTest {
     String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM cp.`%s` UNION ALL SELECT key FROM dfs.tmp.`%s`",
+        .sqlQuery("SELECT key FROM cp.`%s` UNION ALL SELECT key FROM dfs.tmp_default_format.`%s`",
             rootSimple, EMPTY_DIR_NAME)
         .unOrdered()
         .baselineColumns("key")
@@ -1279,7 +1289,7 @@ public class TestUnionAll extends ClusterTest {
     final String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%s` UNION ALL SELECT key FROM cp.`%s`",
+        .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%s` UNION ALL SELECT key FROM cp.`%s`",
             EMPTY_DIR_NAME, rootSimple)
         .unOrdered()
         .baselineColumns("key")
@@ -1298,7 +1308,11 @@ public class TestUnionAll extends ClusterTest {
         .build();
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM dfs.tmp.`%1$s`", EMPTY_DIR_NAME)
+        .sqlQuery(
+            "SELECT key FROM dfs.tmp_default_format.`%1$s` UNION ALL SELECT key FROM " +
+            "dfs.tmp_default_format.`%1$s`",
+            EMPTY_DIR_NAME
+        )
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
@@ -1307,7 +1321,8 @@ public class TestUnionAll extends ClusterTest {
   @Test
   public void testUnionAllMiddleEmptyDir() throws Exception {
     final String query = "SELECT n_regionkey FROM cp.`tpch/nation.parquet` UNION ALL " +
-        "SELECT missing_key FROM dfs.tmp.`%s` UNION ALL SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+        "SELECT missing_key FROM dfs.tmp_default_format.`%s` UNION ALL SELECT r_regionkey " +
+        "FROM cp.`tpch/region.parquet`";
 
     testBuilder()
         .sqlQuery(query, EMPTY_DIR_NAME)
@@ -1324,8 +1339,8 @@ public class TestUnionAll extends ClusterTest {
     final String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM " +
-            "(SELECT key FROM dfs.tmp.`%1$s` UNION ALL SELECT key FROM cp.`%2$s`)",
+        .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%1$s` UNION ALL SELECT key FROM " +
+            "(SELECT key FROM dfs.tmp_default_format.`%1$s` UNION ALL SELECT key FROM cp.`%2$s`)",
             EMPTY_DIR_NAME, rootSimple)
         .unOrdered()
         .baselineColumns("key")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
index d3ec4e580f..86271cd063 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
@@ -29,9 +29,11 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -41,14 +43,25 @@ import java.nio.file.Paths;
 import java.util.List;
 
 @Category({SqlTest.class, OperatorTest.class})
-public class TestUnionDistinct extends BaseTestQuery {
+public class TestUnionDistinct extends ClusterTest {
   private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
   private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
 
   private static final String EMPTY_DIR_NAME = "empty_directory";
 
   @BeforeClass
-  public static void setupFiles() {
+  public static void setupFiles() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
+
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
     dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
   }
@@ -227,9 +240,9 @@ public class TestUnionDistinct extends BaseTestQuery {
 
   @Test
   public void testUnionDistinctViewExpandableStar() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunion as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
-    test("create view region_view_testunion as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
+    run("use dfs.tmp");
+    run("create view nation_view_testunion as select n_name, n_nationkey from cp.`tpch/nation.parquet`");
+    run("create view region_view_testunion as select r_name, r_regionkey from cp.`tpch/region.parquet`");
 
     String query1 = "(select * from dfs.tmp.`nation_view_testunion`) \n" +
         "union \n" +
@@ -258,33 +271,33 @@ public class TestUnionDistinct extends BaseTestQuery {
           .build()
           .run();
     } finally {
-      test("drop view nation_view_testunion");
-      test("drop view region_view_testunion");
+      run("drop view nation_view_testunion");
+      run("drop view region_view_testunion");
     }
   }
 
   @Test(expected = UnsupportedRelOperatorException.class)
   public void testUnionDistinctViewUnExpandableStar() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunion as select * from cp.`tpch/nation.parquet`;");
+    run("use dfs.tmp");
+    run("create view nation_view_testunion as select * from cp.`tpch/nation.parquet`");
 
     try {
       String query = "(select * from dfs.tmp.`nation_view_testunion`) \n" +
           "union (select * from cp.`tpch/region.parquet`)";
-      test(query);
+      run(query);
     } catch(UserException ex) {
       SqlUnsupportedException.errorClassNameToException(ex.getOrCreatePBError(false).getException().getExceptionClass());
       throw ex;
     } finally {
-      test("drop view nation_view_testunion");
+      run("drop view nation_view_testunion");
     }
   }
 
   @Test
   public void testDiffDataTypesAndModes() throws Exception {
-    test("use dfs.tmp");
-    test("create view nation_view_testunion as select n_name, n_nationkey from cp.`tpch/nation.parquet`;");
-    test("create view region_view_testunion as select r_name, r_regionkey from cp.`tpch/region.parquet`;");
+    run("use dfs.tmp");
+    run("create view nation_view_testunion as select n_name, n_nationkey from cp.`tpch/nation.parquet`");
+    run("create view region_view_testunion as select r_name, r_regionkey from cp.`tpch/region.parquet`");
 
     String t1 = "(select n_comment, n_regionkey from cp.`tpch/nation.parquet` limit 5)";
     String t2 = "(select * from nation_view_testunion  limit 5)";
@@ -303,8 +316,8 @@ public class TestUnionDistinct extends BaseTestQuery {
           .build()
           .run();
     } finally {
-      test("drop view nation_view_testunion");
-      test("drop view region_view_testunion");
+      run("drop view nation_view_testunion");
+      run("drop view region_view_testunion");
     }
   }
 
@@ -388,7 +401,7 @@ public class TestUnionDistinct extends BaseTestQuery {
 
   @Test(expected = UserException.class)
   public void testUnionDistinctImplicitCastingFailure() throws Exception {
-    test("(select key from cp.`store/json/intData.json` " +
+    run("(select key from cp.`store/json/intData.json` " +
       "union select key from cp.`store/json/booleanData.json` )");
   }
 
@@ -720,7 +733,13 @@ public class TestUnionDistinct extends BaseTestQuery {
     final String[] expectedPlan = {"HashAgg.*\n" +
         ".*UnionAll"};
     final String[] excludedPlan = {"HashAgg.*\n.*HashAgg"};
-    PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(expectedPlan)
+      .exclude(excludedPlan)
+      .match();
 
     // Validate the result
     testBuilder()
@@ -746,8 +765,13 @@ public class TestUnionDistinct extends BaseTestQuery {
     final String[] excludedPlan = {};
 
     try {
-      test(sliceTargetSmall);
-      PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
+      run(sliceTargetSmall);
+
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .match();
 
       testBuilder()
         .optionSettingQueriesForTestQuery(sliceTargetSmall)
@@ -758,7 +782,7 @@ public class TestUnionDistinct extends BaseTestQuery {
         .build()
         .run();
     } finally {
-      test(sliceTargetDefault);
+      run(sliceTargetDefault);
     }
   }
 
@@ -789,7 +813,7 @@ public class TestUnionDistinct extends BaseTestQuery {
       union
       select columns[0], columns[1] ... columns[1200] from dfs.`union_for_1200_columns.csv`
        */
-      test("select %1$s from dfs.`%2$s` union select %1$s from dfs.`%2$s`", columns.toString(), file.getName());
+      run("select %1$s from dfs.`%2$s` union select %1$s from dfs.`%2$s`", columns.toString(), file.getName());
     } finally {
       FileUtils.deleteQuietly(file);
     }
@@ -801,7 +825,7 @@ public class TestUnionDistinct extends BaseTestQuery {
     String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM cp.`%s` UNION SELECT key FROM dfs.tmp.`%s`",
+        .sqlQuery("SELECT key FROM cp.`%s` UNION SELECT key FROM dfs.tmp_default_format.`%s`",
             rootSimple, EMPTY_DIR_NAME)
         .unOrdered()
         .baselineColumns("key")
@@ -815,7 +839,7 @@ public class TestUnionDistinct extends BaseTestQuery {
     final String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%s` UNION SELECT key FROM cp.`%s`",
+        .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%s` UNION SELECT key FROM cp.`%s`",
             EMPTY_DIR_NAME, rootSimple)
         .unOrdered()
         .baselineColumns("key")
@@ -834,7 +858,11 @@ public class TestUnionDistinct extends BaseTestQuery {
         .build();
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM dfs.tmp.`%1$s`", EMPTY_DIR_NAME)
+        .sqlQuery(
+            "SELECT key FROM dfs.tmp_default_format.`%1$s` UNION SELECT key FROM " +
+              " dfs.tmp_default_format.`%1$s`",
+            EMPTY_DIR_NAME
+        )
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
@@ -843,7 +871,7 @@ public class TestUnionDistinct extends BaseTestQuery {
   @Test
   public void testUnionMiddleEmptyDir() throws Exception {
     final String query = "SELECT n_regionkey FROM cp.`tpch/nation.parquet` UNION " +
-        "SELECT missing_key FROM dfs.tmp.`%s` UNION SELECT r_regionkey FROM cp.`tpch/region.parquet`";
+        "SELECT missing_key FROM dfs.tmp_default_format.`%s` UNION SELECT r_regionkey FROM cp.`tpch/region.parquet`";
 
     testBuilder()
         .sqlQuery(query, EMPTY_DIR_NAME)
@@ -859,8 +887,8 @@ public class TestUnionDistinct extends BaseTestQuery {
     final String rootSimple = "/store/json/booleanData.json";
 
     testBuilder()
-        .sqlQuery("SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM " +
-                "(SELECT key FROM dfs.tmp.`%1$s` UNION SELECT key FROM cp.`%2$s`)",
+        .sqlQuery("SELECT key FROM dfs.tmp_default_format.`%1$s` UNION SELECT key FROM " +
+                "(SELECT key FROM dfs.tmp_default_format.`%1$s` UNION SELECT key FROM cp.`%2$s`)",
             EMPTY_DIR_NAME, rootSimple)
         .unOrdered()
         .baselineColumns("key")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
index 8cf598412f..a82a3e23d1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestEmptyInputSql.java
@@ -17,26 +17,29 @@
  */
 package org.apache.drill.exec;
 
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.exec.record.BatchSchemaBuilder;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertEquals;
+
 import java.nio.file.Paths;
 import java.util.List;
 
 @Category(UnlikelyTest.class)
-public class TestEmptyInputSql extends BaseTestQuery {
+public class TestEmptyInputSql extends ClusterTest {
 
   private static final String SINGLE_EMPTY_JSON = "/scan/emptyInput/emptyJson/empty.json";
   private static final String SINGLE_EMPTY_CSVH = "/scan/emptyInput/emptyCsvH/empty.csvh";
@@ -44,7 +47,18 @@ public class TestEmptyInputSql extends BaseTestQuery {
   private static final String EMPTY_DIR_NAME = "empty_directory";
 
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setupTestFiles() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
+
     dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIR_NAME));
   }
 
@@ -144,11 +158,11 @@ public class TestEmptyInputSql extends BaseTestQuery {
   }
 
   private void enableV2Reader(boolean enable) throws Exception {
-    alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable);
+    client.alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable);
   }
 
   private void resetV2Reader() throws Exception {
-    resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY);
+    client.resetSession(ExecConstants.ENABLE_V2_JSON_READER_KEY);
   }
 
   /**
@@ -160,7 +174,7 @@ public class TestEmptyInputSql extends BaseTestQuery {
   @Test
   public void testQueryConstExprEmptyJson() throws Exception {
     try {
-      alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+      client.alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
       SchemaBuilder schemaBuilder = new SchemaBuilder()
           .add("key",
               TypeProtos.MajorType.newBuilder()
@@ -184,7 +198,7 @@ public class TestEmptyInputSql extends BaseTestQuery {
           .build()
           .run();
     } finally {
-      resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+      client.resetSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
     }
   }
 
@@ -230,7 +244,7 @@ public class TestEmptyInputSql extends BaseTestQuery {
         .build();
 
     testBuilder()
-        .sqlQuery("select * from dfs.tmp.`%s`", EMPTY_DIR_NAME)
+        .sqlQuery("select * from dfs.tmp_default_format.`%s`", EMPTY_DIR_NAME)
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
@@ -245,7 +259,7 @@ public class TestEmptyInputSql extends BaseTestQuery {
         .build();
 
     testBuilder()
-        .sqlQuery("select key from dfs.tmp.`%s`", EMPTY_DIR_NAME)
+        .sqlQuery("select key from dfs.tmp_default_format.`%s`", EMPTY_DIR_NAME)
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
@@ -262,7 +276,7 @@ public class TestEmptyInputSql extends BaseTestQuery {
 
     testBuilder()
         .sqlQuery("select WeekId, Product as ProductName from (select CAST(`dir0` as INT) AS WeekId, " +
-            "Product from dfs.tmp.`%s`)", EMPTY_DIR_NAME)
+            "Product from dfs.tmp_default_format.`%s`)", EMPTY_DIR_NAME)
         .schemaBaseLine(expectedSchema)
         .build()
         .run();
@@ -287,8 +301,12 @@ public class TestEmptyInputSql extends BaseTestQuery {
 
   @Test
   public void testEmptyDirectoryPlanSerDe() throws Exception {
-    String query = String.format("select * from dfs.tmp.`%s`", EMPTY_DIR_NAME);
-    PlanTestBase.testPhysicalPlanExecutionBasedOnQuery(query);
+    String plan = queryBuilder()
+      .sql("select * from dfs.tmp_default_format.`%s`", EMPTY_DIR_NAME)
+      .explainJson();
+
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(0, count);
   }
 
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
index 77470d9df2..f6f6c46bd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestJoinEmptyDirTable.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.physical.impl.join;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -35,15 +38,32 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   private static final String EMPTY_DIRECTORY = "empty_directory";
 
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setupTestFiles() throws Exception {
     dirTestWatcher.makeTestTmpSubDir(Paths.get(EMPTY_DIRECTORY));
+    FileSystemConfig dfsConfig = (FileSystemConfig) getDrillbitContext()
+        .getStorage()
+        .getDefinedConfig(StoragePluginTestUtils.DFS_PLUGIN_NAME);
+
+    String tmpWorkspacePath = dfsConfig.getWorkspaces()
+        .get(StoragePluginTestUtils.TMP_SCHEMA)
+        .getLocation();
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    WorkspaceConfig tmpDefaultFormatConfig = new WorkspaceConfig(
+        tmpWorkspacePath,
+        true,
+        "csvh",
+        false
+    );
+    dfsConfig.getWorkspaces().put("tmp_default_format", tmpDefaultFormatConfig);
   }
 
   @Test
   public void testHashInnerJoinWithLeftEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(true, false, false);
@@ -59,7 +79,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testHashInnerJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 inner join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(true, false, false);
@@ -75,7 +95,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testHashInnerJoinWithBothEmptyDirTables() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%1$s` t1 inner join dfs.tmp_default_format.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(true, false, false);
@@ -91,7 +111,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testHashLeftJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 left join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 1155;
 
       enableJoin(true, false, false);
@@ -107,7 +127,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testHashRightJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 right join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(true, false, false);
@@ -123,7 +143,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testMergeInnerJoinWithLeftEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, true, false);
@@ -139,7 +159,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testMergeInnerJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 inner join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, true, false);
@@ -155,7 +175,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testMergeInnerJoinWithBothEmptyDirTables() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%1$s` t1 inner join dfs.tmp_default_format.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, true, false);
@@ -171,7 +191,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testMergeLeftJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 left join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 1155;
 
       enableJoin(false, true, false);
@@ -187,7 +207,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testMergeRightJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 right join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, true, false);
@@ -203,7 +223,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopInnerJoinWithLeftEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%s` t1 inner join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, false, true);
@@ -219,7 +239,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopInnerJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 inner join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 inner join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, false, true);
@@ -235,7 +255,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopInnerJoinWithBothEmptyDirTables() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%1$s` t1 inner join dfs.tmp.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%1$s` t1 inner join dfs.tmp_default_format.`%1$s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, false, true);
@@ -251,7 +271,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopLeftJoinWithLeftEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%s` t1 left join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%s` t1 left join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 0;
 
       enableJoin(false, false, true);
@@ -271,7 +291,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopLeftJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 left join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 left join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 1155;
 
       enableJoin(false, false, true);
@@ -287,7 +307,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopRightJoinWithLeftEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from dfs.tmp.`%s` t1 right join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from dfs.tmp_default_format.`%s` t1 right join cp.`employee.json` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
       final int expectedRecordCount = 1155;
 
       enableJoin(false, false, true);
@@ -304,7 +324,7 @@ public class TestJoinEmptyDirTable extends JoinTestBase {
   public void testNestedLoopRightJoinWithRightEmptyDirTable() throws Exception {
     try {
       String query = String.format("select t1.`employee_id`, t1.`full_name`, t2.`employee_id`, t2.`full_name` " +
-          "from cp.`employee.json` t1 right join dfs.tmp.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
+          "from cp.`employee.json` t1 right join dfs.tmp_default_format.`%s` t2 on t1.`full_name` = t2.`full_name`", EMPTY_DIRECTORY);
 
       enableJoin(false, false, true);
       // The nested loops join does not support the "RIGHT OUTER JOIN" logical join operator.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
index 8d9d4fc690..f74671b687 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
@@ -17,44 +17,65 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-public class TestParquetGroupScan extends BaseTestQuery {
+public class TestParquetGroupScan extends ClusterTest {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
+  }
 
   private void prepareTables(final String tableName, boolean refreshMetadata) throws Exception {
     // first create some parquet subfolders
-    testNoResult("CREATE TABLE dfs.tmp.`%s`      AS SELECT employee_id FROM cp.`employee.json` LIMIT 1", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/501`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 2", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/502`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 4", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/503`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 8", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/504`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 16", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/505`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 32", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/60`   AS SELECT employee_id FROM cp.`employee.json` LIMIT 64", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/602`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 128", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/6031` AS SELECT employee_id FROM cp.`employee.json` LIMIT 256", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/6032` AS SELECT employee_id FROM cp.`employee.json` LIMIT 512", tableName);
-    testNoResult("CREATE TABLE dfs.tmp.`%s/6033` AS SELECT employee_id FROM cp.`employee.json` LIMIT 1024", tableName);
+    run("CREATE TABLE dfs.tmp.`%s`      AS SELECT employee_id FROM cp.`employee.json` LIMIT 1", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/501`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 2", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/502`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 4", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/503`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 8", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/504`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 16", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/505`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 32", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/60`   AS SELECT employee_id FROM cp.`employee.json` LIMIT 64", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/602`  AS SELECT employee_id FROM cp.`employee.json` LIMIT 128", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/6031` AS SELECT employee_id FROM cp.`employee.json` LIMIT 256", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/6032` AS SELECT employee_id FROM cp.`employee.json` LIMIT 512", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/6033` AS SELECT employee_id FROM cp.`employee.json` LIMIT 1024",
+tableName);
 
     // we need an empty subfolder `4376/20160401`
     // to do this we first create a table inside that subfolder
-    testNoResult("CREATE TABLE dfs.tmp.`%s/6041/a` AS SELECT * FROM cp.`employee.json` LIMIT 1", tableName);
+    run("CREATE TABLE dfs.tmp.`%s/6041/a` AS SELECT * FROM cp.`employee.json` LIMIT 1", tableName);
     // then we delete the table, leaving the parent subfolder empty
-    testNoResult("DROP TABLE   dfs.tmp.`%s/6041/a`", tableName);
+    run("DROP TABLE   dfs.tmp.`%s/6041/a`", tableName);
 
     if (refreshMetadata) {
       // build the metadata cache file
-      testNoResult("REFRESH TABLE METADATA dfs.tmp.`%s`", tableName);
+      run("REFRESH TABLE METADATA dfs.tmp.`%s`", tableName);
     }
   }
 
   @Test
   public void testFix4376() throws Exception {
     prepareTables("4376_1", true);
+    long actualRecordCount =  client.queryBuilder()
+        .sql("SELECT * FROM dfs.tmp.`4376_1/60*`")
+        .run()
+        .recordCount();
 
-    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_1/60*`");
     int expectedRecordCount = 1984;
     assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
@@ -64,7 +85,11 @@ public class TestParquetGroupScan extends BaseTestQuery {
   public void testWildCardEmptyWithCache() throws Exception {
     prepareTables("4376_2", true);
 
-    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_2/604*`");
+    long actualRecordCount = client.queryBuilder()
+        .sql("SELECT * FROM dfs.tmp.`4376_2/604*`")
+        .run()
+        .recordCount();
+
     int expectedRecordCount = 0;
     assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
@@ -74,7 +99,11 @@ public class TestParquetGroupScan extends BaseTestQuery {
   public void testWildCardEmptyNoCache() throws Exception {
     prepareTables("4376_3", false);
 
-    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_3/604*`");
+    long actualRecordCount = client.queryBuilder()
+        .sql("SELECT * FROM dfs.tmp_default_format.`4376_3/604*`")
+        .run()
+        .recordCount();
+
     int expectedRecordCount = 0;
     assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
@@ -84,7 +113,11 @@ public class TestParquetGroupScan extends BaseTestQuery {
   public void testSelectEmptyWithCache() throws Exception {
     prepareTables("4376_4", true);
 
-    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_4/6041`");
+    long actualRecordCount = client.queryBuilder()
+        .sql("SELECT * FROM dfs.tmp.`4376_4/6041`")
+        .run()
+        .recordCount();
+
     int expectedRecordCount = 0;
     assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
@@ -94,7 +127,11 @@ public class TestParquetGroupScan extends BaseTestQuery {
   public void testSelectEmptyNoCache() throws Exception {
     prepareTables("4376_5", false);
 
-    int actualRecordCount = testSql("SELECT * FROM dfs.tmp.`4376_5/6041`");
+    long actualRecordCount = client.queryBuilder()
+        .sql("SELECT * FROM dfs.tmp_default_format.`4376_5/6041`")
+        .run()
+        .recordCount();
+
     int expectedRecordCount = 0;
     assertEquals(String.format("Received unexpected number of rows in output: expected = %d, received = %s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 2ecc3b5e3c..d6a76a2fac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -19,14 +19,17 @@ package org.apache.drill.exec.store.parquet;
 
 import org.apache.commons.io.filefilter.FalseFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchemaBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.MetadataPathUtils;
 import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -46,141 +49,182 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class TestParquetMetadataCache extends PlanTestBase {
+public class TestParquetMetadataCache extends ClusterTest {
   private static final String TABLE_NAME_1 = "parquetTable1";
   private static final String TABLE_NAME_2 = "parquetTable2";
+  private static String tableBasePath;
 
   @BeforeClass
-  public static void copyData() {
+  public static void copyData() throws Exception {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(TABLE_NAME_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(TABLE_NAME_2));
     dirTestWatcher.copyResourceToRoot(Paths.get("parquet"));
+    tableBasePath = dirTestWatcher.getRootDir().getCanonicalPath();
+
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // A tmp workspace with a default format defined for tests that need to
+    // query empty directories without encountering an error.
+    cluster.defineWorkspace(
+        StoragePluginTestUtils.DFS_PLUGIN_NAME,
+        "tmp_default_format",
+        dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(),
+        "csvh"
+    );
+
   }
 
   @Test
   public void testPartitionPruningWithMetadataCache_1() throws Exception {
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
             " where dir0=1994 and dir1 in ('Q1', 'Q2')", TABLE_NAME_1);
     int expectedRowCount = 20;
     int expectedNumFiles = 2;
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1994", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_1);
-    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
   }
 
   @Test // DRILL-3917, positive test case for DRILL-4530
   @Category(UnlikelyTest.class)
   public void testPartitionPruningWithMetadataCache_2() throws Exception {
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
             " where dir0=1994", TABLE_NAME_1);
     int expectedRowCount = 40;
     int expectedNumFiles = 4;
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1994", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_1);
-    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {"Filter"});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .exclude("Filter")
+      .match();
   }
 
   @Test // DRILL-3937 (partitioning column is varchar)
   @Category(UnlikelyTest.class)
   public void testPartitionPruningWithMetadataCache_3() throws Exception {
     String tableName = "orders_ctas_varchar";
-    test("use dfs");
-    test("create table %s (o_orderdate, o_orderpriority) partition by (o_orderpriority) "
+    run("use dfs");
+    run("create table %s (o_orderdate, o_orderpriority) partition by (o_orderpriority) "
         + "as select o_orderdate, o_orderpriority from dfs.`multilevel/parquet/1994/Q1`", tableName);
-    test("refresh table metadata %s", tableName);
+    run("refresh table metadata %s", tableName);
     checkForMetadataFile(tableName);
     String query = String.format("select * from %s where o_orderpriority = '1-URGENT'", tableName);
     int expectedRowCount = 3;
     int expectedNumFiles = 1;
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern)
+      .match();
   }
 
   @Test // DRILL-3937 (partitioning column is binary using convert_to)
   @Category(UnlikelyTest.class)
   public void testPartitionPruningWithMetadataCache_4() throws Exception {
     String tableName = "orders_ctas_binary";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table %s (o_orderdate, o_orderpriority) partition by (o_orderpriority) "
+    run("create table %s (o_orderdate, o_orderpriority) partition by (o_orderpriority) "
         + "as select o_orderdate, convert_to(o_orderpriority, 'UTF8') as o_orderpriority "
         + "from dfs.`multilevel/parquet/1994/Q1`", tableName);
-    test("refresh table metadata %s", tableName);
+    run("refresh table metadata %s", tableName);
     checkForMetadataFile(tableName);
     String query = String.format("select * from %s where o_orderpriority = '1-URGENT'", tableName);
     int expectedRowCount = 3;
     int expectedNumFiles = 1;
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern)
+      .match();
   }
 
   @Test
   public void testCache() throws Exception {
     String tableName = "nation_ctas";
-    test("use dfs");
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("refresh table metadata %s", tableName);
+    run("use dfs");
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("refresh table metadata %s", tableName);
     checkForMetadataFile(tableName);
     String query = String.format("select * from %s", tableName);
-    int rowCount = testSql(query);
+    long rowCount = client.queryBuilder().sql(query).run().recordCount();
     Assert.assertEquals(50, rowCount);
-    testPlanMatchingPatterns(query, new String[] { "usedMetadataFile=true" }, new String[]{});
+
+    String usedMetaPattern = "usedMetadataFile=true";
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(usedMetaPattern)
+      .match();
   }
 
   @Test
   public void testUpdate() throws Exception {
     String tableName = "nation_ctas_update";
-    test("use dfs");
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("refresh table metadata %s", tableName);
+    run("use dfs");
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("refresh table metadata %s", tableName);
     checkForMetadataFile(tableName);
     Thread.sleep(1000);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    int rowCount = testSql(String.format("select * from %s", tableName));
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    long rowCount = client.queryBuilder()
+      .sql("select * from %s", tableName)
+      .run()
+      .recordCount();
+
     Assert.assertEquals(50, rowCount);
   }
 
   @Test
   public void testCacheWithSubschema() throws Exception {
     String tableName = "nation_ctas_subschema";
-    test("create table dfs.`%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("refresh table metadata dfs.%s", tableName);
+    run("create table dfs.`%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("refresh table metadata dfs.%s", tableName);
     checkForMetadataFile(tableName);
-    int rowCount = testSql(String.format("select * from dfs.%s", tableName));
+    long rowCount = client.queryBuilder()
+      .sql("select * from dfs.%s", tableName)
+      .run()
+      .recordCount();
     Assert.assertEquals(25, rowCount);
   }
 
   @Test
   @Category(UnlikelyTest.class)
   public void testFix4449() throws Exception {
-    runSQL("CREATE TABLE dfs.tmp.`4449` PARTITION BY(l_discount) AS SELECT l_orderkey, l_discount FROM cp.`tpch/lineitem.parquet`");
-    runSQL("REFRESH TABLE METADATA dfs.tmp.`4449`");
+    run("CREATE TABLE dfs.tmp.`4449` PARTITION BY(l_discount) AS SELECT l_orderkey, l_discount FROM cp.`tpch/lineitem.parquet`");
+    run("REFRESH TABLE METADATA dfs.tmp.`4449`");
 
     testBuilder()
       .sqlQuery("SELECT COUNT(*) cnt FROM (" +
@@ -217,7 +261,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     dirTestWatcher.makeTestTmpSubDir(Paths.get(emptyDirName));
 
     testBuilder()
-        .sqlQuery("refresh table metadata dfs.tmp.`%s`", emptyDirName)
+        .sqlQuery("refresh table metadata dfs.tmp_default_format.`%s`", emptyDirName)
         .unOrdered()
         .baselineColumns("ok", "summary")
         .baselineValues(false, String.format("Table %s is empty and doesn't contain any parquet files.", emptyDirName))
@@ -251,7 +295,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Category(UnlikelyTest.class)
   public void testDrill4530_1() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -260,20 +304,24 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 20;
     int expectedNumFiles = 2;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995/Q3", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {"Filter"});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .exclude("Filter")
+      .match();
   }
 
   @Test // DRILL-4530  // single non-leaf level partition
   @Category(UnlikelyTest.class)
   public void testDrill4530_2() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -282,20 +330,24 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 80;
     int expectedNumFiles = 8;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {"Filter"});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .exclude("Filter")
+      .match();
   }
 
   @Test // DRILL-4530  // only dir1 filter is present, no dir0, hence this maps to multiple partitions
   @Category(UnlikelyTest.class)
   public void testDrill4530_3() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -304,20 +356,23 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 40;
     int expectedNumFiles = 4;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
   }
 
   @Test // DRILL-4530  // non-existent partition (1 subdirectory's cache file will still be read for schema)
   @Category(UnlikelyTest.class)
   public void testDrill4530_4() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -326,19 +381,22 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 0;
     int expectedNumFiles = 1;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/*/*", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
   }
 
   @Test // DRILL-4794
   @Category(UnlikelyTest.class)
   public void testDrill4794() throws Exception {
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
             " where dir0=1994 or dir1='Q3'", TABLE_NAME_1);
@@ -346,20 +404,23 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 60;
     int expectedNumFiles = 6;
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_1);
-    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
   }
 
   @Test // DRILL-4786
   @Category(UnlikelyTest.class)
   public void testDrill4786_1() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -369,13 +430,16 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 40;
     int expectedNumFiles = 4;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
 
   }
 
@@ -383,7 +447,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Category(UnlikelyTest.class)
   public void testDrill4786_2() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -393,20 +457,23 @@ public class TestParquetMetadataCache extends PlanTestBase {
     int expectedRowCount = 40;
     int expectedNumFiles = 4;
 
-    int actualRowCount = testSql(query1);
+    long actualRowCount = client.queryBuilder().sql(query1).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
   }
 
   @Test // DRILL-4877
   @Category(UnlikelyTest.class)
   public void testDrill4877() throws Exception {
     // create metadata cache
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_2);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_2);
     checkForMetadataFile(TABLE_NAME_2);
 
     // run query and check correctness
@@ -424,8 +491,11 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getRootDir().getCanonicalPath(), TABLE_NAME_2);
-    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {});
+    client.queryBuilder()
+      .sql(query1)
+      .planMatcher()
+      .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+      .match();
 
   }
 
@@ -435,19 +505,22 @@ public class TestParquetMetadataCache extends PlanTestBase {
     final String tableName = "nation_move";
     final String newTableName = "nation_moved";
     try {
-      test("use dfs");
-      test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-      test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-      test("refresh table metadata %s", tableName);
+      run("use dfs");
+      run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+      run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+      run("refresh table metadata %s", tableName);
       checkForMetadataFile(tableName);
       File srcFile = new File(dirTestWatcher.getRootDir(), tableName);
       File dstFile = new File(dirTestWatcher.getRootDir(), newTableName);
       FileUtils.moveDirectory(srcFile, dstFile);
       assertFalse("Cache file was not moved successfully", srcFile.exists());
-      int rowCount = testSql(String.format("select * from %s", newTableName));
+      long rowCount = client.queryBuilder()
+        .sql("select * from %s", newTableName)
+        .run()
+        .recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files", 50, rowCount);
     } finally {
-      test("drop table if exists %s", newTableName);
+      run("drop table if exists %s", newTableName);
     }
   }
 
@@ -465,15 +538,15 @@ public class TestParquetMetadataCache extends PlanTestBase {
 
     for (String metadataPath : metadataPaths) {
       try {
-        test("use dfs.tmp");
+        run("use dfs.tmp");
         // creating two inner directories to leverage METADATA_DIRECTORIES_FILENAME metadata file as well
         final Path absolutePathsMetadataT1 = tablePath.resolve("t1");
         final Path absolutePathsMetadataT2 = tablePath.resolve("t2");
 
         String createQuery = "create table `%s` as select * from cp.`tpch/nation.parquet`";
 
-        test(createQuery, absolutePathsMetadataT1);
-        test(createQuery, absolutePathsMetadataT2);
+        run(createQuery, absolutePathsMetadataT1);
+        run(createQuery, absolutePathsMetadataT2);
 
         Path relativePath = rootMetadataPath.resolve(metadataPath);
 
@@ -500,16 +573,20 @@ public class TestParquetMetadataCache extends PlanTestBase {
         String query = String.format("select * from %s", tablePath);
         int expectedRowCount = 50;
         int expectedNumFiles = 1; // point to selectionRoot since no pruning is done in this query
-        int actualRowCount = testSql(query);
+        long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
         assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
                       expectedRowCount, actualRowCount);
         String numFilesPattern = "numFiles=" + expectedNumFiles;
         String usedMetaPattern = "usedMetadataFile=true";
         String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getDfsTestTmpDir().getCanonicalPath(), tablePath);
-        PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-                                              new String[]{"Filter"});
+        client.queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+          .exclude("Filter")
+          .match();
       } finally {
-        test("drop table if exists %s", tablePath);
+        run("drop table if exists %s", tablePath);
       }
     }
   }
@@ -519,25 +596,29 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testSpacesInMetadataCachePath() throws Exception {
     final String pathWithSpaces = "path with spaces";
     try {
-      test("use dfs");
+      run("use dfs");
       // creating multilevel table to store path with spaces in both metadata files (METADATA and METADATA_DIRECTORIES)
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
-      test("create table `%1$s/%1$s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
-      test("refresh table metadata `%s`", pathWithSpaces);
+      run("create table `%s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
+      run("create table `%1$s/%1$s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
+      run("refresh table metadata `%s`", pathWithSpaces);
       checkForMetadataFile(pathWithSpaces);
       String query = String.format("select * from `%s`", pathWithSpaces);
       int expectedRowCount = 50;
       int expectedNumFiles = 1; // point to selectionRoot since no pruning is done in this query
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
           expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=true";
       String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", dirTestWatcher.getRootDir().getCanonicalPath(), pathWithSpaces);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-          new String[] {"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern, cacheFileRootPattern)
+        .exclude("Filter")
+        .match();
     } finally {
-      test("drop table if exists `%s`", pathWithSpaces);
+      run("drop table if exists `%s`", pathWithSpaces);
     }
   }
 
@@ -546,8 +627,8 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testFutureUnsupportedMetadataVersion() throws Exception {
     final String unsupportedMetadataVersion = "unsupported_metadata_version";
     try {
-      test("use dfs.tmp");
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", unsupportedMetadataVersion);
+      run("use dfs.tmp");
+      run("create table `%s` as select * from cp.`tpch/nation.parquet`", unsupportedMetadataVersion);
       MetadataVersion lastVersion = MetadataVersion.Constants.SUPPORTED_VERSIONS.last();
       // Get the future version, which is absent in MetadataVersions.SUPPORTED_VERSIONS set
       String futureVersion = new MetadataVersion(lastVersion.getMajor() + 1, 0).toString();
@@ -558,15 +639,18 @@ public class TestParquetMetadataCache extends PlanTestBase {
       String query = String.format("select * from %s", unsupportedMetadataVersion);
       int expectedRowCount = 25;
       int expectedNumFiles = 1;
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
           expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
-          new String[] {"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern)
+        .match();
     } finally {
-      test("drop table if exists %s", unsupportedMetadataVersion);
+      run("drop table if exists %s", unsupportedMetadataVersion);
     }
   }
 
@@ -574,23 +658,26 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testCorruptedMetadataFile() throws Exception {
     final String corruptedMetadata = "corrupted_metadata";
     try {
-      test("use dfs.tmp");
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", corruptedMetadata);
+      run("use dfs.tmp");
+      run("create table `%s` as select * from cp.`tpch/nation.parquet`", corruptedMetadata);
       dirTestWatcher.copyResourceToTestTmp(
         Paths.get("parquet", "unsupported_metadata", "corrupted_metadata.requires_replace.txt"),
         Paths.get(corruptedMetadata, Metadata.OLD_METADATA_FILENAME));
       String query = String.format("select * from %s", corruptedMetadata);
       int expectedRowCount = 25;
       int expectedNumFiles = 1;
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
           expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
-          new String[] {"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern)
+        .match();
     } finally {
-      test("drop table if exists %s", corruptedMetadata);
+      run("drop table if exists %s", corruptedMetadata);
     }
   }
 
@@ -598,23 +685,26 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testEmptyMetadataFile() throws Exception {
     final String emptyMetadataFile = "empty_metadata_file";
     try {
-      test("use dfs.tmp");
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", emptyMetadataFile);
+      run("use dfs.tmp");
+      run("create table `%s` as select * from cp.`tpch/nation.parquet`", emptyMetadataFile);
       dirTestWatcher.copyResourceToTestTmp(
         Paths.get("parquet", "unsupported_metadata", "empty_metadata_file.requires_replace.txt"),
         Paths.get(emptyMetadataFile, Metadata.OLD_METADATA_FILENAME));
       String query = String.format("select * from %s", emptyMetadataFile);
       int expectedRowCount = 25;
       int expectedNumFiles = 1;
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
           expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
-          new String[] {"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern)
+        .match();
     } finally {
-      test("drop table if exists %s", emptyMetadataFile);
+      run("drop table if exists %s", emptyMetadataFile);
     }
   }
 
@@ -626,8 +716,8 @@ public class TestParquetMetadataCache extends PlanTestBase {
       Paths.get(rootMetaCorruptedTable));
 
     try {
-      test("use dfs");
-      test("refresh table metadata `%s`", rootMetaCorruptedTable);
+      run("use dfs");
+      run("refresh table metadata `%s`", rootMetaCorruptedTable);
       checkForMetadataFile(rootMetaCorruptedTable);
       File rootMetadataFile = FileUtils.getFile(dataDir, Metadata.METADATA_FILENAME);
       assertTrue(String.format("Metadata cache file '%s' isn't deleted", rootMetadataFile.getPath()), rootMetadataFile.delete());
@@ -637,11 +727,16 @@ public class TestParquetMetadataCache extends PlanTestBase {
       String query = String.format("select dir0, dir1, o_custkey, o_orderdate from `%s` " + " where dir0=1994 or dir1='Q3'", rootMetaCorruptedTable);
       int expectedRowCount = 60;
       int expectedNumFiles = 6;
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files", expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=false";
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[]{"cacheFileRoot", "Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern)
+        .exclude("Filter", "cacheFileRoot")
+        .match();
     } finally {
       FileUtils.deleteQuietly(dataDir);
     }
@@ -656,8 +751,8 @@ public class TestParquetMetadataCache extends PlanTestBase {
       Paths.get(innerMetaCorruptedTable));
 
     try {
-      test("use dfs");
-      test("refresh table metadata `%s`", innerMetaCorruptedTable);
+      run("use dfs");
+      run("refresh table metadata `%s`", innerMetaCorruptedTable);
       checkForMetadataFile(innerMetaCorruptedTable);
       File firstInnerMetadataFile = FileUtils.getFile(dataDir, "1994", Metadata.METADATA_FILENAME);
       File secondInnerMetadataFile = FileUtils.getFile(dataDir, "1994", "Q3", Metadata.METADATA_FILENAME);
@@ -672,13 +767,17 @@ public class TestParquetMetadataCache extends PlanTestBase {
           " where dir0=1994 or dir1='Q3'", innerMetaCorruptedTable);
       int expectedRowCount = 60;
       int expectedNumFiles = 6;
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
           expectedRowCount, actualRowCount);
       String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetadataFile=false";
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
-          new String[] {"cacheFileRoot", "Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, usedMetaPattern)
+        .exclude("Filter", "cacheFileRoot")
+        .match();
     } finally {
       FileUtils.deleteQuietly(dataDir);
     }
@@ -697,24 +796,32 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testMetadataCacheFieldWithDots() throws Exception {
     final String tableWithDots = "dfs.tmp.`complex_table`";
     try {
-      test("create table %s as\n" +
+      run("create table %s as\n" +
         "select cast(1 as int) as `column.with.dots`, t.`column`.`with.dots`\n" +
         "from cp.`store/parquet/complex/complex.parquet` t limit 1", tableWithDots);
 
       String query = String.format("select * from %s", tableWithDots);
       int expectedRowCount = 1;
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, null);
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .match();
 
-      test("refresh table metadata %s", tableWithDots);
+      run("refresh table metadata %s", tableWithDots);
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, null);
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .match();
     } finally {
-      test(String.format("drop table if exists %s", tableWithDots));
+      run(String.format("drop table if exists %s", tableWithDots));
     }
   }
 
@@ -722,23 +829,33 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testBooleanPartitionPruning() throws Exception {
     final String boolPartitionTable = "dfs.tmp.`interval_bool_partition`";
     try {
-      test("create table %s partition by (col_bln) as " +
+      run("create table %s partition by (col_bln) as " +
         "select * from cp.`parquet/alltypes_required.parquet`", boolPartitionTable);
 
       String query = String.format("select * from %s where col_bln = true", boolPartitionTable);
       int expectedRowCount = 2;
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
-      test("refresh table metadata %s", boolPartitionTable);
+      run("refresh table metadata %s", boolPartitionTable);
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
     } finally {
-      test("drop table if exists %s", boolPartitionTable);
+      run("drop table if exists %s", boolPartitionTable);
     }
   }
 
@@ -746,24 +863,34 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testIntervalDayPartitionPruning() throws Exception {
     final String intervalDayPartitionTable = "dfs.tmp.`interval_day_partition`";
     try {
-      test("create table %s partition by (col_intrvl_day) as " +
+      run("create table %s partition by (col_intrvl_day) as " +
         "select * from cp.`parquet/alltypes_optional.parquet`", intervalDayPartitionTable);
 
       String query = String.format("select * from %s " +
         "where col_intrvl_day = cast('P26DT27386S' as interval day)", intervalDayPartitionTable);
       int expectedRowCount = 1;
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
-      test("refresh table metadata %s", intervalDayPartitionTable);
+      run("refresh table metadata %s", intervalDayPartitionTable);
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
     } finally {
-      test(String.format("drop table if exists %s", intervalDayPartitionTable));
+      run(String.format("drop table if exists %s", intervalDayPartitionTable));
     }
   }
 
@@ -771,50 +898,60 @@ public class TestParquetMetadataCache extends PlanTestBase {
   public void testIntervalYearPartitionPruning() throws Exception {
     final String intervalYearPartitionTable = "dfs.tmp.`interval_yr_partition`";
     try {
-      test("create table %s partition by (col_intrvl_yr) as " +
+      run("create table %s partition by (col_intrvl_yr) as " +
         "select * from cp.`parquet/alltypes_optional.parquet`", intervalYearPartitionTable);
 
       String query = String.format("select * from %s where col_intrvl_yr = cast('P314M' as interval year)",
         intervalYearPartitionTable);
       int expectedRowCount = 1;
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
-      test("refresh table metadata %s", intervalYearPartitionTable);
+      run("refresh table metadata %s", intervalYearPartitionTable);
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
     } finally {
-      test("drop table if exists %s", intervalYearPartitionTable);
+      run("drop table if exists %s", intervalYearPartitionTable);
     }
   }
 
   @Test // DRILL-4139
   public void testIntWithNullsPartitionPruning() throws Exception {
     try {
-      test("create table dfs.tmp.`t5/a` as\n" +
+      run("create table dfs.tmp.`t5/a` as\n" +
         "select 100 as mykey from cp.`tpch/nation.parquet`\n" +
         "union all\n" +
         "select col_notexist from cp.`tpch/region.parquet`");
 
-      test("create table dfs.tmp.`t5/b` as\n" +
+      run("create table dfs.tmp.`t5/b` as\n" +
         "select 200 as mykey from cp.`tpch/nation.parquet`\n" +
         "union all\n" +
         "select col_notexist from cp.`tpch/region.parquet`");
 
       String query = "select mykey from dfs.tmp.`t5` where mykey = 100";
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 25, actualRowCount);
 
-      test("refresh table metadata dfs.tmp.`t5`");
+      run("refresh table metadata dfs.tmp.`t5`");
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 25, actualRowCount);
     } finally {
-      test("drop table if exists dfs.tmp.`t5`");
+      run("drop table if exists dfs.tmp.`t5`");
     }
   }
 
@@ -822,50 +959,70 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Category(UnlikelyTest.class)
   public void testPartitionPruningWithIsNull() throws Exception {
     try {
-      test("create table dfs.tmp.`t6/a` as\n" +
+      run("create table dfs.tmp.`t6/a` as\n" +
         "select col_notexist as mykey from cp.`tpch/region.parquet`");
 
-      test("create table dfs.tmp.`t6/b` as\n" +
+      run("create table dfs.tmp.`t6/b` as\n" +
         "select 100 as mykey from cp.`tpch/region.parquet`");
 
       String query = "select mykey from dfs.tmp.t6 where mykey is null";
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 5, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
-      test("refresh table metadata dfs.tmp.`t6`");
+      run("refresh table metadata dfs.tmp.`t6`");
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 5, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
     } finally {
-      test("drop table if exists dfs.tmp.`t6`");
+      run("drop table if exists dfs.tmp.`t6`");
     }
   }
 
   @Test // DRILL-4139
   public void testPartitionPruningWithIsNotNull() throws Exception {
     try {
-      test("create table dfs.tmp.`t7/a` as\n" +
+      run("create table dfs.tmp.`t7/a` as\n" +
         "select col_notexist as mykey from cp.`tpch/region.parquet`");
 
-      test("create table dfs.tmp.`t7/b` as\n" +
+      run("create table dfs.tmp.`t7/b` as\n" +
         "select 100 as mykey from cp.`tpch/region.parquet`");
 
       String query = "select mykey from dfs.tmp.t7 where mykey is null";
 
-      int actualRowCount = testSql(query);
+      long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 5, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
-      test("refresh table metadata dfs.tmp.`t7`");
+      run("refresh table metadata dfs.tmp.`t7`");
 
-      actualRowCount = testSql(query);
+      actualRowCount = client.queryBuilder().sql(query).run().recordCount();
       assertEquals("Row count does not match the expected value", 5, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
     } finally {
-      test("drop table if exists dfs.tmp.`t7`");
+      run("drop table if exists dfs.tmp.`t7`");
     }
   }
 
@@ -911,10 +1068,10 @@ public class TestParquetMetadataCache extends PlanTestBase {
 
   @Test
   public void testAutoRefreshPartitionPruning() throws Exception {
-    test("create table dfs.tmp.`orders` partition by (o_orderstatus) as\n" +
+    run("create table dfs.tmp.`orders` partition by (o_orderstatus) as\n" +
         "select * from cp.`tpch/orders.parquet`");
 
-    test("refresh table metadata dfs.tmp.`orders`");
+    run("refresh table metadata dfs.tmp.`orders`");
 
     File ordersTable = new File(dirTestWatcher.getDfsTestTmpDir(), "orders");
 
@@ -924,58 +1081,62 @@ public class TestParquetMetadataCache extends PlanTestBase {
 
     String query = "select * from dfs.tmp.`orders`\n" +
         "where o_orderstatus = 'O' and o_orderdate < '1995-03-10'";
-    PlanTestBase.testPlanOneExpectedPattern(query, "numRowGroups=1");
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numRowGroups=1")
+      .match();
 
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals("Row count does not match the expected value", 1, actualRowCount);
     // TODO: Check that metadata cache file is actually regenerated, once Drill will use JDK version with resolved JDK-8177809.
   }
 
   @Test
   public void testRefreshDefault() throws Exception {
-    test("refresh table metadata dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
             " where dir0=1994 and dir1 in ('Q1', 'Q2')", TABLE_NAME_1);
     int expectedRowCount = 20;
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
   }
 
   @Test
   public void testRefreshWithColumns() throws Exception {
-    test("refresh table metadata columns (`date`, o_orderdate) dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata columns (`date`, o_orderdate) dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey as `date`, o_orderdate from dfs.`%s` " +
             " where dir0=1994 and dir1 in ('Q1', 'Q2')", TABLE_NAME_1);
     int expectedRowCount = 20;
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
   }
 
   @Test
   public void testRefreshNone() throws Exception {
-    test("refresh table metadata columns none dfs.`%s`", TABLE_NAME_1);
+    run("refresh table metadata columns none dfs.`%s`", TABLE_NAME_1);
     checkForMetadataFile(TABLE_NAME_1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs.`%s` " +
             " where dir0=1994 and dir1 in ('Q1', 'Q2')", TABLE_NAME_1);
     int expectedRowCount = 20;
-    int actualRowCount = testSql(query);
+    long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
     assertEquals(expectedRowCount, actualRowCount);
   }
 
   @Test // Test total rowcount from the summary file
   public void testTotalRowCount() throws Exception {
     String tableName = "nation_ctas_rowcount";
-    test("use dfs");
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("use dfs");
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
 
     String query = String.format("select * from `%s`", tableName);
-    long rowCount = testSql(query);
-    test("refresh table metadata %s", tableName);
+    long rowCount = client.queryBuilder().sql(query).run().recordCount();
+    run("refresh table metadata %s", tableName);
     checkForMetadataFile(tableName);
     createMetadataDir(tableName);
 
@@ -990,15 +1151,15 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test // Test total row count of sub-dir in te summary file.
   public void testTotalRowCountSubDir() throws Exception {
     String tableName = "nation_ctas_rowcount_subdir";
-    test("use dfs");
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("use dfs");
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
 
     String query = String.format("select * from `%s/t1`", tableName);
-    long rowCount = testSql(query);
-    test("refresh table metadata %s", tableName);
+    long rowCount = client.queryBuilder().sql(query).run().recordCount();
+    run("refresh table metadata %s", tableName);
     tableName = tableName + "/t1";
     checkForMetadataFile(tableName);
     createMetadataDir(tableName);
@@ -1014,16 +1175,16 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test //Test total row count after adding a directory post refresh
   public void testTotalRowCountAddDirectory() throws Exception {
     String tableName = "nation_ctas_rowcount_add_dir";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
 
-    test("refresh table metadata %s", tableName);
+    run("refresh table metadata %s", tableName);
     Thread.sleep(1000);
-    test("create table `%s/t5` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t5` as select * from cp.`tpch/nation.parquet`", tableName);
 
     String query = String.format("select count(*) as count from `%s`", tableName);
     String rowCountQuery = String.format("select t.totalRowCount as rowCount from `%s/metadataDir/summary_meta.json` as t", tableName);
@@ -1050,17 +1211,17 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test //Test total row count after adding a directory under sub-dir post refresh
   public void testTotalRowCountAddSubDir() throws Exception {
     String tableName = "nation_ctas_rowcount_add_subdir";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/nation.parquet`", tableName);
 
-    test("refresh table metadata %s", tableName);
+    run("refresh table metadata %s", tableName);
     Thread.sleep(1000);
     tableName = tableName + "/t1";
-    test("create table `%s/t5` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t5` as select * from cp.`tpch/nation.parquet`", tableName);
 
     String query = String.format("select count(*) as count from `%s`", tableName);
     String rowCountQuery = String.format("select t.totalRowCount as rowCount from `%s/metadataDir/summary_meta.json` as t", tableName);
@@ -1084,14 +1245,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testTotalRowCountAddFile() throws Exception {
     String tableName = "orders_ctas_refresh_add_file";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
     Thread.sleep(1000);
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet/1994/Q1/orders_94_q1.parquet"), Paths.get("orders_ctas_refresh_add_file/t1/q1.parquet"));
     tableName = tableName + "/t1";
@@ -1119,14 +1280,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshWithInterestingColumn() throws Exception {
     String tableName = "orders_ctas_refresh_interesting_col";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
     Thread.sleep(1000);
 
     String rowCountQuery = String.format("select t.allColumnsInteresting as allColumnsInteresting from `%s/metadataDir/summary_meta.json` as t", tableName);
@@ -1143,14 +1304,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testDefaultRefresh() throws Exception {
     String tableName = "orders_ctas_refresh_default";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata %s", tableName);
+    run("refresh table metadata %s", tableName);
     Thread.sleep(1000);
 
     String rowCountQuery = String.format("select t.allColumnsInteresting as allColumnsInteresting from `%s/metadataDir/summary_meta.json` as t", tableName);
@@ -1167,17 +1328,17 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testAutoRefreshWithInterestingColumn() throws Exception {
     String tableName = "orders_ctas_autorefresh_int_col";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
     Thread.sleep(1000);
-    test("create table `%s/t5` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("Select count(*) from `%s`", tableName);
+    run("create table `%s/t5` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("Select count(*) from `%s`", tableName);
     tableName = tableName + "/t5";
 
     checkForMetadataFile(tableName);
@@ -1196,17 +1357,17 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testAutoRefreshWithInterestingColumnFile() throws Exception {
     String tableName = "orders_ctas_autorefresh_add_file";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
     Thread.sleep(1000);
-    test("create table `%s/t5` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("Select count(*) from `%s`", tableName);
+    run("create table `%s/t5` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("Select count(*) from `%s`", tableName);
     tableName = tableName + "/t5";
 
     checkForMetadataFile(tableName);
@@ -1226,9 +1387,9 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshWithIsNull() throws Exception {
     String tableName = "orders_ctas_refresh_not_null";
-    test("use dfs");
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("use dfs");
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
     String query = String.format("Select count(*) as cnt from `%s` where o_orderpriority is not null", tableName);
 
     checkForMetadataFile(tableName);
@@ -1243,14 +1404,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshExistentColumns() throws Exception {
     String tableName = "orders_ctas_ex";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
 
     String query = String.format("select count(*) as cnt from `%s` where o_orderdate is not null", tableName);
 
@@ -1260,7 +1421,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numRowGroupsPattern ="numRowGroups=" + expectedNumRowGroups;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, numRowGroupsPattern, usedMetaPattern}, new String[]{"Filter"});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern)
+      .include(numRowGroupsPattern)
+      .include(usedMetaPattern)
+      .exclude("Filter")
+      .match();
 
     testBuilder()
         .sqlQuery(query)
@@ -1274,14 +1442,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshNonExistentColumns() throws Exception {
     String tableName = "orders_ctas_nonex";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
 
     String query = String.format("select count(*) as cnt from `%s` where random is not null", tableName);
 
@@ -1291,7 +1459,13 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numRowGroupsPattern ="numRowGroups=" + expectedNumRowGroups;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, numRowGroupsPattern, usedMetaPattern});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern)
+      .include(numRowGroupsPattern)
+      .include(usedMetaPattern)
+      .match();
 
     testBuilder()
         .sqlQuery(query)
@@ -1304,14 +1478,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshNonExistentColumnFilter() throws Exception {
     String tableName = "orders_ctas_nonex_filter";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
 
     String query = String.format("select count(o_orderdate) as cnt from `%s` where random > 10", tableName);
 
@@ -1321,7 +1495,13 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numRowGroupsPattern ="numRowGroups=" + expectedNumRowGroups;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, numRowGroupsPattern, usedMetaPattern});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern)
+      .include(numRowGroupsPattern)
+      .include(usedMetaPattern)
+      .match();
 
     testBuilder()
         .sqlQuery(query)
@@ -1334,14 +1514,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshNonExAndNonIntColumnFilter() throws Exception {
     String tableName = "orders_ctas_nonex_nonint";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
 
     String query = String.format("select count(o_orderdate) as cnt from `%s` where random > 10 and o_orderpriority = '1_URGENT'", tableName);
 
@@ -1351,7 +1531,13 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numRowGroupsPattern ="numRowGroups=" + expectedNumRowGroups;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, numRowGroupsPattern, usedMetaPattern});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern)
+      .include(numRowGroupsPattern)
+      .include(usedMetaPattern)
+      .match();
 
     testBuilder()
         .sqlQuery(query)
@@ -1365,14 +1551,14 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshNonInterestingColumns() throws Exception {
     String tableName = "orders_ctas_nonint";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t3` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t4` as select * from cp.`tpch/orders.parquet`", tableName);
 
-    test("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
+    run("refresh table metadata COLUMNS (o_orderdate) %s", tableName);
 
     String query = String.format("select count(*) as cnt from `%s` where o_orderpriority is not null", tableName);
 
@@ -1382,7 +1568,13 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String numRowGroupsPattern ="numRowGroups=" + expectedNumRowGroups;
     String usedMetaPattern = "usedMetadataFile=true";
 
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, numRowGroupsPattern, usedMetaPattern});
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include(numFilesPattern)
+      .include(numRowGroupsPattern)
+      .include(usedMetaPattern)
+      .match();
 
     testBuilder()
         .sqlQuery(query)
@@ -1395,10 +1587,10 @@ public class TestParquetMetadataCache extends PlanTestBase {
   @Test
   public void testRefreshSchemaChange() throws Exception {
     String tableName = "orders_nation_ctas";
-    test("use dfs");
+    run("use dfs");
 
-    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
-    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    run("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    run("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
     String query = String.format("refresh table metadata %s", tableName);
 
     testBuilder()
@@ -1407,15 +1599,16 @@ public class TestParquetMetadataCache extends PlanTestBase {
             .baselineColumns("ok", "summary")
             .baselineValues(true, "Successfully updated metadata for table orders_nation_ctas.")
             .go();
+
     checkForMetadataFile(tableName);
   }
 
   @Test
   public void testRefreshWithDictColumn() throws Exception {
-    test("use dfs");
+    run("use dfs");
 
     String tableName = "parquet_map_ctas";
-    test("create table `%s` as select * from cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
+    run("create table `%s` as select * from cp.`store/parquet/complex/map/parquet/000000_0.parquet`", tableName);
 
     String metadataQuery = String.format("refresh table metadata %s", tableName);
     testBuilder()
@@ -1436,6 +1629,38 @@ public class TestParquetMetadataCache extends PlanTestBase {
         .baselineValues(1, TestBuilder.mapOfObject("b", 6, "c", 7))
         .go();
 
-    PlanTestBase.testPlanMatchingPatterns(query, "numFiles=1", "usedMetadataFile=true");
+    client.queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numFiles=1", "usedMetadataFile=true")
+      .match();
+  }
+
+  private void checkForMetadataFile(String tableName) {
+    assertTrue(
+      String.format("There is no metadata cache file for the %s table", tableName),
+      MetadataPathUtils.checkForMetadataFile(tableBasePath, tableName)
+    );
+  }
+
+  /**
+   * Create a temp metadata directory to query the metadata summary cache file
+   * @param table table name or table path
+   */
+  private void createMetadataDir(String table) throws IOException {
+    final String tmpDir;
+    try {
+      tmpDir = dirTestWatcher.getRootDir().getCanonicalPath();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    File metadataDir = dirTestWatcher.makeRootSubDir(Paths.get(tmpDir, table, "metadataDir"));
+    File metaFile, newFile;
+    metaFile = table.startsWith(tmpDir) ? FileUtils.getFile(table, Metadata.METADATA_SUMMARY_FILENAME)
+            : FileUtils.getFile(tmpDir, table, Metadata.METADATA_SUMMARY_FILENAME);
+    File tablefile = new File(tmpDir, table);
+    newFile = new File(tablefile, "summary_meta.json");
+    FileUtils.copyFile(metaFile, newFile);
+    FileUtils.copyFileToDirectory(newFile, metadataDir);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index 10ec029664..c666f394a1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -390,10 +390,10 @@ public class BaseTestQuery extends ExecTest {
     }
   }
 
-  protected static void testNoResult(int interation, String query, Object... args) throws Exception {
+  protected static void testNoResult(int iteration, String query, Object... args) throws Exception {
     query = String.format(query, args);
     logger.debug("Running query:\n--------------\n" + query);
-    for (int i = 0; i < interation; i++) {
+    for (int i = 0; i < iteration; i++) {
       List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
       for (QueryDataBatch queryDataBatch : results) {
         queryDataBatch.release();