You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2020/03/02 15:00:39 UTC

[drill] 02/08: DRILL-7477: Allow passing table function parameters into ANALYZE statement

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7abe97f4ed4b506dd9c86e6489f434c7f910d1c1
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Feb 20 18:39:33 2020 +0200

    DRILL-7477: Allow passing table function parameters into ANALYZE statement
    
    - Fix logical dir pruning when table function is used
    
    closes #2005
---
 .../src/main/codegen/includes/parserImpls.ftl      |  24 ++-
 .../drill/exec/planner/logical/DrillTable.java     |   7 +-
 .../drill/exec/planner/sql/SchemaUtilites.java     |  13 ++
 .../planner/sql/handlers/AnalyzeTableHandler.java  |  17 +-
 .../exec/planner/sql/handlers/DrillTableInfo.java  | 171 +++++++++++++++++++++
 .../sql/handlers/MetastoreAnalyzeTableHandler.java |  55 ++-----
 .../exec/planner/sql/parser/SqlAnalyzeTable.java   |  30 +---
 .../exec/planner/sql/parser/SqlCreateTable.java    |   8 +-
 .../exec/planner/sql/parser/SqlCreateView.java     |   8 +-
 .../exec/planner/sql/parser/SqlDropTable.java      |   7 +-
 .../planner/sql/parser/SqlDropTableMetadata.java   |   8 +-
 .../drill/exec/planner/sql/parser/SqlDropView.java |   7 +-
 .../sql/parser/SqlMetastoreAnalyzeTable.java       |  31 +---
 .../planner/sql/parser/SqlRefreshMetadata.java     |  10 +-
 .../drill/exec/planner/sql/parser/SqlSchema.java   |   7 +-
 .../apache/drill/exec/store/AbstractSchema.java    |   2 +
 .../exec/store/dfs/WorkspaceSchemaFactory.java     |  75 +++++----
 .../java/org/apache/drill/TestPartitionFilter.java | 113 ++++++++++----
 .../org/apache/drill/TestSelectWithOption.java     |  40 +++--
 .../org/apache/drill/exec/sql/TestAnalyze.java     |   2 +-
 .../drill/exec/sql/TestMetastoreCommands.java      |  30 ++++
 21 files changed, 445 insertions(+), 220 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index ae52f1f..67ecab0 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -700,15 +700,16 @@ Pair<SqlNodeList, SqlNodeList> ParenthesizedCompoundIdentifierList() :
 /**
  * Parses a analyze statements:
  * <ul>
- * <li>ANALYZE TABLE [table_name] [COLUMNS (col1, col2, ...)] REFRESH METADATA [partition LEVEL] {COMPUTE | ESTIMATE} | STATISTICS [(column1, column2, ...)] [ SAMPLE numeric PERCENT ]
+ * <li>ANALYZE TABLE [table_name | table({table function name}(parameters))] [COLUMNS {(col1, col2, ...) | NONE}] REFRESH METADATA ['level' LEVEL] [{COMPUTE | ESTIMATE} | STATISTICS [ SAMPLE number PERCENT ]]
  * <li>ANALYZE TABLE [table_name] DROP [METADATA|STATISTICS] [IF EXISTS]
- * <li>ANALYZE TABLE [table_name] {COMPUTE | ESTIMATE} | STATISTICS [(column1, column2, ...)] [ SAMPLE numeric PERCENT ]
+ * <li>ANALYZE TABLE [table_name | table({table function name}(parameters))] {COMPUTE | ESTIMATE} | STATISTICS [(column1, column2, ...)] [ SAMPLE numeric PERCENT ]
  * </ul>
  */
 SqlNode SqlAnalyzeTable() :
 {
     SqlParserPos pos;
-    SqlIdentifier tblName;
+    SqlNode tableRef;
+    Span s = null;
     SqlNodeList fieldList = null;
     SqlNode level = null;
     SqlLiteral estimate = null;
@@ -719,7 +720,13 @@ SqlNode SqlAnalyzeTable() :
 {
     <ANALYZE> { pos = getPos(); }
     <TABLE>
-    tblName = CompoundIdentifier()
+    (
+        tableRef = CompoundIdentifier()
+    |
+        <TABLE> { s = span(); } <LPAREN>
+        tableRef = TableFunctionCall(s.pos())
+        <RPAREN>
+    )
     [
         (
             (
@@ -749,7 +756,7 @@ SqlNode SqlAnalyzeTable() :
             ]
             {
                 if (percent == null) { percent = SqlLiteral.createExactNumeric("100.0", pos); }
-                return new SqlAnalyzeTable(pos, tblName, estimate, fieldList, percent);
+                return new SqlAnalyzeTable(pos, tableRef, estimate, fieldList, percent);
             }
         )
         |
@@ -792,7 +799,7 @@ SqlNode SqlAnalyzeTable() :
                 }
             ]
             {
-                return new SqlMetastoreAnalyzeTable(pos, tblName, fieldList, level, estimate, percent);
+                return new SqlMetastoreAnalyzeTable(pos, tableRef, fieldList, level, estimate, percent);
             }
         )
         |
@@ -816,7 +823,10 @@ SqlNode SqlAnalyzeTable() :
                 if (checkMetadataExistence == null) {
                   checkMetadataExistence = SqlLiteral.createBoolean(true, pos);
                 }
-                return new SqlDropTableMetadata(pos, tblName, dropMetadata, checkMetadataExistence);
+                if (s != null) {
+                  throw new ParseException("Table functions shouldn't be used in DROP METADATA statement.");
+                }
+                return new SqlDropTableMetadata(pos, (SqlIdentifier) tableRef, dropMetadata, checkMetadataExistence);
             }
         )
     ]
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index b4e1952..4bdf7cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.logical;
 
 import java.io.IOException;
 
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -168,9 +169,9 @@ public abstract class DrillTable implements Table {
   }
 
   public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
-    return new DrillScanRel(context.getCluster(),
-        context.getCluster().traitSetOf(DrillRel.DRILL_LOGICAL),
-        table);
+    // returns non-drill table scan to allow directory-based partition pruning
+    // before table group scan is created
+    return EnumerableTableScan.create(context.getCluster(), table);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index c442a89..3ec98d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.sql;
 
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -328,4 +329,16 @@ public class SchemaUtilites {
     return schema;
   }
 
+  /**
+   * Returns schema path which corresponds to the specified table identifier.
+   * If table identifier contains only table name, empty list will be returned.
+   *
+   * @param tableIdentifier table identifier
+   * @return schema path which corresponds to the specified table identifier
+   */
+  public static List<String> getSchemaPath(SqlIdentifier tableIdentifier) {
+    return tableIdentifier.isSimple()
+        ? Collections.emptyList()
+        : tableIdentifier.names.subList(0, tableIdentifier.names.size() - 1);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
index b7a5add..4c5af1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
@@ -70,12 +70,12 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
 
     verifyNoUnsupportedFunctions(sqlAnalyzeTable);
 
-    SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier();
+    SqlNode tableRef = sqlAnalyzeTable.getTableRef();
     SqlSelect scanSql = new SqlSelect(
         SqlParserPos.ZERO,              /* position */
         SqlNodeList.EMPTY,              /* keyword list */
         getColumnList(sqlAnalyzeTable), /* select list */
-        tableIdentifier,                /* from */
+        tableRef,                       /* from */
         null,                           /* where */
         null,                           /* group by */
         null,                           /* having */
@@ -85,13 +85,14 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
         null                            /* fetch */
     );
 
-    final ConvertedRelNode convertedRelNode = validateAndConvert(rewrite(scanSql));
-    final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
+    ConvertedRelNode convertedRelNode = validateAndConvert(rewrite(scanSql));
+    RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
 
-    final RelNode relScan = convertedRelNode.getConvertedNode();
-    final String tableName = sqlAnalyzeTable.getName();
-    final AbstractSchema drillSchema = SchemaUtilites.resolveToDrillSchema(
-        config.getConverter().getDefaultSchema(), sqlAnalyzeTable.getSchemaPath());
+    RelNode relScan = convertedRelNode.getConvertedNode();
+    DrillTableInfo drillTableInfo = DrillTableInfo.getTableInfoHolder(sqlAnalyzeTable.getTableRef(), config);
+    String tableName = drillTableInfo.tableName();
+    AbstractSchema drillSchema = SchemaUtilites.resolveToDrillSchema(
+        config.getConverter().getDefaultSchema(), drillTableInfo.schemaPath());
     Table table = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
 
     if (table == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DrillTableInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DrillTableInfo.java
new file mode 100644
index 0000000..d8d0a75
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DrillTableInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.validate.SqlUserDefinedTableMacro;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Holder class for {@link DrillTable}, {@code tableName} and table {@code schemaPath} obtained from
+ * {@code SqlNode tableRef}.
+ */
+public class DrillTableInfo {
+  private static final Logger logger = LoggerFactory.getLogger(DrillTableInfo.class);
+
+  private final DrillTable drillTable;
+  private final String tableName;
+  private final List<String> schemaPath;
+
+  private DrillTableInfo(DrillTable drillTable, List<String> schemaPath, String tableName) {
+    this.drillTable = drillTable;
+    this.tableName = tableName;
+    this.schemaPath = schemaPath;
+  }
+
+  public DrillTable drillTable() {
+    return drillTable;
+  }
+
+  public String tableName() {
+    return tableName;
+  }
+
+  public List<String> schemaPath() {
+    return schemaPath;
+  }
+
+  /**
+   * Returns {@link DrillTableInfo} instance which holds {@link DrillTable}, {@code drillTable},
+   * {@code schemaPath} corresponding to specified {@code tableRef}.
+   *
+   * @param tableRef table ref
+   * @param config   handler config
+   * @return {@link DrillTableInfo} instance
+   */
+  public static DrillTableInfo getTableInfoHolder(SqlNode tableRef, SqlHandlerConfig config) {
+    switch (tableRef.getKind()) {
+      case COLLECTION_TABLE: {
+        SqlCall call = (SqlCall) config.getConverter().validate(tableRef);
+        assert call.getOperandList().size() == 1;
+        SqlOperator operator = ((SqlCall) call.operand(0)).getOperator();
+        assert operator instanceof SqlUserDefinedTableMacro;
+        SqlUserDefinedTableMacro tableMacro = (SqlUserDefinedTableMacro) operator;
+        SqlIdentifier tableIdentifier = tableMacro.getSqlIdentifier();
+
+        AbstractSchema drillSchema = SchemaUtilites.resolveToDrillSchema(
+            config.getConverter().getDefaultSchema(), SchemaUtilites.getSchemaPath(tableIdentifier));
+
+        TranslatableTable translatableTable = tableMacro.getTable(config.getConverter().getTypeFactory(), prepareTableMacroOperands(call.operand(0)));
+        DrillTable table = ((DrillTranslatableTable) translatableTable).getDrillTable();
+        return new DrillTableInfo(table, drillSchema.getSchemaPath(), Util.last(tableIdentifier.names));
+      }
+      case IDENTIFIER: {
+        SqlIdentifier tableIdentifier = (SqlIdentifier) tableRef;
+        AbstractSchema drillSchema = SchemaUtilites.resolveToDrillSchema(
+            config.getConverter().getDefaultSchema(), SchemaUtilites.getSchemaPath(tableIdentifier));
+        String tableName = Util.last(tableIdentifier.names);
+        DrillTable table = getDrillTable(drillSchema, tableName);
+        return new DrillTableInfo(table, drillSchema.getSchemaPath(), tableName);
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported table ref kind: " + tableRef.getKind());
+    }
+  }
+
+  /**
+   * Returns list with operands for table function, obtained from specified call in the order
+   * suitable to be used in table function and default values for absent arguments.
+   * For example, for the following call:
+   * <pre>
+   *   `dfs`.`corrupted_dates`(`type` => 'parquet', `autoCorrectCorruptDates` => FALSE, `enableStringsSignedMinMax` => FALSE)
+   * </pre>
+   * will be returned the following list:
+   * <pre>
+   *   ['parquet', FALSE, FALSE, DEFAULT]
+   * </pre>
+   * whose elements correspond to the following parameters:
+   * <pre>
+   *   [type, autoCorrectCorruptDates, enableStringsSignedMinMax, schema]
+   * </pre>
+   *
+   * @param call sql call whose arguments should be prepared
+   * @return list with operands for table function
+   */
+  private static List<SqlNode> prepareTableMacroOperands(SqlCall call) {
+    Function<String, SqlNode> convertOperand = paramName -> call.getOperandList().stream()
+        .map(sqlNode -> (SqlCall) sqlNode)
+        .filter(sqlCall -> ((SqlIdentifier) sqlCall.operand(1)).getSimple().equals(paramName))
+        .peek(sqlCall -> Preconditions.checkState(sqlCall.getKind() == SqlKind.ARGUMENT_ASSIGNMENT))
+        .findFirst()
+        .map(sqlCall -> (SqlNode) sqlCall.operand(0))
+        .orElse(SqlStdOperatorTable.DEFAULT.createCall(SqlParserPos.ZERO));
+
+    SqlFunction operator = (SqlFunction) call.getOperator();
+
+    return operator.getParamNames().stream()
+        .map(convertOperand)
+        .collect(Collectors.toList());
+  }
+
+  private static DrillTable getDrillTable(AbstractSchema drillSchema, String tableName) {
+    Table tableFromSchema = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
+
+    if (tableFromSchema == null) {
+      throw UserException.validationError()
+          .message("No table with given name [%s] exists in schema [%s]", tableName, drillSchema.getFullSchemaName())
+          .build(logger);
+    }
+
+    switch (tableFromSchema.getJdbcTableType()) {
+      case TABLE:
+        if (tableFromSchema instanceof DrillTable) {
+          return (DrillTable) tableFromSchema;
+        } else {
+          throw UserException.validationError()
+              .message("ANALYZE does not support [%s] table kind", tableFromSchema.getClass().getSimpleName())
+              .build(logger);
+        }
+      default:
+        throw UserException.validationError()
+            .message("ANALYZE does not support [%s] object type", tableFromSchema.getJdbcTableType())
+            .build(logger);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
index 2be8dfc..856788c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.sql.handlers;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -55,7 +54,6 @@ import org.apache.drill.exec.planner.logical.MetadataControllerRel;
 import org.apache.drill.exec.planner.logical.MetadataHandlerRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlMetastoreAnalyzeTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -105,27 +103,26 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
     context.getOptions().setLocalOption(ExecConstants.METASTORE_ENABLED, false);
     SqlMetastoreAnalyzeTable sqlAnalyzeTable = unwrap(sqlNode, SqlMetastoreAnalyzeTable.class);
 
-    AbstractSchema drillSchema = SchemaUtilites.resolveToDrillSchema(
-        config.getConverter().getDefaultSchema(), sqlAnalyzeTable.getSchemaPath());
-    DrillTable table = getDrillTable(drillSchema, sqlAnalyzeTable.getName());
+    SqlNode tableRef = sqlAnalyzeTable.getTableRef();
 
-    AnalyzeInfoProvider analyzeInfoProvider = table.getGroupScan().getAnalyzeInfoProvider();
+    DrillTableInfo drillTableInfo = DrillTableInfo.getTableInfoHolder(tableRef, config);
+
+    AnalyzeInfoProvider analyzeInfoProvider = drillTableInfo.drillTable().getGroupScan().getAnalyzeInfoProvider();
 
     if (analyzeInfoProvider == null) {
       throw UserException.validationError()
-          .message("ANALYZE is not supported for group scan [%s]", table.getGroupScan())
+          .message("ANALYZE is not supported for group scan [%s]", drillTableInfo.drillTable().getGroupScan())
           .build(logger);
     }
 
     ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(context.getOptions());
 
-    SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier();
     // creates select with DYNAMIC_STAR column and analyze specific columns to obtain corresponding table scan
     SqlSelect scanSql = new SqlSelect(
         SqlParserPos.ZERO,
         SqlNodeList.EMPTY,
-        getColumnList(analyzeInfoProvider.getProjectionFields(table, getMetadataType(sqlAnalyzeTable), columnNamesOptions)),
-        tableIdentifier,
+        getColumnList(analyzeInfoProvider.getProjectionFields(drillTableInfo.drillTable(), getMetadataType(sqlAnalyzeTable), columnNamesOptions)),
+        tableRef,
         null,
         null,
         null,
@@ -140,7 +137,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
 
     RelNode relScan = convertedRelNode.getConvertedNode();
 
-    DrillRel drel = convertToDrel(relScan, drillSchema, table, sqlAnalyzeTable);
+    DrillRel drel = convertToDrel(relScan, sqlAnalyzeTable, drillTableInfo);
 
     Prel prel = convertToPrel(drel, validatedRowType);
     logAndSetTextPlan("Drill Physical", prel, logger);
@@ -150,31 +147,6 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
     return plan;
   }
 
-  private DrillTable getDrillTable(AbstractSchema drillSchema, String tableName) {
-    Table tableFromSchema = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
-
-    if (tableFromSchema == null) {
-      throw UserException.validationError()
-          .message("No table with given name [%s] exists in schema [%s]", tableName, drillSchema.getFullSchemaName())
-          .build(logger);
-    }
-
-    switch (tableFromSchema.getJdbcTableType()) {
-      case TABLE:
-        if (tableFromSchema instanceof DrillTable) {
-          return  (DrillTable) tableFromSchema;
-        } else {
-          throw UserException.validationError()
-              .message("ANALYZE does not support [%s] table kind", tableFromSchema.getClass().getSimpleName())
-              .build(logger);
-        }
-      default:
-        throw UserException.validationError()
-            .message("ANALYZE does not support [%s] object type", tableFromSchema.getJdbcTableType())
-            .build(logger);
-    }
-  }
-
   /**
    * Generates the column list with {@link SchemaPath#DYNAMIC_STAR} and columns required for analyze.
    */
@@ -203,18 +175,19 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
   /**
    * Converts to Drill logical plan
    */
-  private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema,
-      DrillTable table, SqlMetastoreAnalyzeTable sqlAnalyzeTable) throws ForemanSetupException, IOException {
+  private DrillRel convertToDrel(RelNode relNode, SqlMetastoreAnalyzeTable sqlAnalyzeTable, DrillTableInfo drillTableInfo) throws ForemanSetupException, IOException {
     RelBuilder relBuilder = LOGICAL_BUILDER.create(relNode.getCluster(), null);
 
+    DrillTable table = drillTableInfo.drillTable();
     AnalyzeInfoProvider analyzeInfoProvider = table.getGroupScan().getAnalyzeInfoProvider();
 
-    List<String> schemaPath = schema.getSchemaPath();
+    List<String> schemaPath = drillTableInfo.schemaPath();
     String pluginName = schemaPath.get(0);
     String workspaceName = Strings.join(schemaPath.subList(1, schemaPath.size()), AbstractSchema.SCHEMA_SEPARATOR);
 
+    String tableName = drillTableInfo.tableName();
     TableInfo tableInfo = TableInfo.builder()
-        .name(sqlAnalyzeTable.getName())
+        .name(tableName)
         .owner(table.getUserName())
         .type(analyzeInfoProvider.getTableTypeName())
         .storagePlugin(pluginName)
@@ -241,7 +214,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
           .tables()
           .basicRequests();
     } catch (MetastoreException e) {
-      logger.error("Error when obtaining Metastore instance for table {}", sqlAnalyzeTable.getName(), e);
+      logger.error("Error when obtaining Metastore instance for table {}", tableName, e);
       DrillRel convertedRelNode = convertToRawDrel(
           relBuilder.values(
                 new String[]{MetastoreAnalyzeConstants.OK_FIELD_NAME, MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME},
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
index 039fe0e..a383703 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.sql.parser;
 import java.util.List;
 
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -30,7 +29,6 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.Util;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
@@ -47,21 +45,21 @@ public class SqlAnalyzeTable extends DrillSqlCall {
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ANALYZE_TABLE", SqlKind.OTHER_DDL) {
     public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
       Preconditions.checkArgument(operands.length == 4, "SqlAnalyzeTable.createCall() has to get 4 operands!");
-      return new SqlAnalyzeTable(pos, (SqlIdentifier) operands[0], (SqlLiteral) operands[1],
+      return new SqlAnalyzeTable(pos, operands[0], (SqlLiteral) operands[1],
           (SqlNodeList) operands[2], (SqlNumericLiteral) operands[3]
       );
     }
   };
 
-  private final SqlIdentifier tblName;
+  private final SqlNode tableRef;
   private final SqlLiteral estimate;
   private final SqlNodeList fieldList;
   private final SqlNumericLiteral samplePercent;
 
-  public SqlAnalyzeTable(SqlParserPos pos, SqlIdentifier tblName, SqlLiteral estimate,
+  public SqlAnalyzeTable(SqlParserPos pos, SqlNode tableRef, SqlLiteral estimate,
       SqlNodeList fieldList, SqlNumericLiteral samplePercent) {
     super(pos);
-    this.tblName = tblName;
+    this.tableRef = tableRef;
     this.estimate = estimate;
     this.fieldList = fieldList;
     this.samplePercent = samplePercent;
@@ -75,7 +73,7 @@ public class SqlAnalyzeTable extends DrillSqlCall {
   @Override
   public List<SqlNode> getOperandList() {
     final List<SqlNode> operands = Lists.newArrayListWithCapacity(4);
-    operands.add(tblName);
+    operands.add(tableRef);
     operands.add(estimate);
     operands.add(fieldList);
     operands.add(samplePercent);
@@ -86,7 +84,7 @@ public class SqlAnalyzeTable extends DrillSqlCall {
   public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
     writer.keyword("ANALYZE");
     writer.keyword("TABLE");
-    tblName.unparse(writer, leftPrec, rightPrec);
+    tableRef.unparse(writer, leftPrec, rightPrec);
     writer.keyword(estimate.booleanValue() ? "ESTIMATE" : "COMPUTE");
     writer.keyword("STATISTICS");
 
@@ -114,20 +112,8 @@ public class SqlAnalyzeTable extends DrillSqlCall {
     return getSqlHandler(config, null);
   }
 
-  public List<String> getSchemaPath() {
-    if (tblName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return tblName.names.subList(0, tblName.names.size() - 1);
-  }
-
-  public SqlIdentifier getTableIdentifier() {
-    return tblName;
-  }
-
-  public String getName() {
-    return Util.last(tblName.names);
+  public SqlNode getTableRef() {
+    return tableRef;
   }
 
   public List<String> getFieldNames() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index 11e33aa..36eefa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql.parser;
 
 import java.util.List;
 
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
@@ -35,7 +36,6 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
 import org.apache.drill.exec.util.Pointer;
@@ -131,11 +131,7 @@ public class SqlCreateTable extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (tblName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return tblName.names.subList(0, tblName.names.size() - 1);
+    return SchemaUtilites.getSchemaPath(tblName);
   }
 
   public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
index e376697..94dd1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.planner.sql.parser;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
@@ -106,11 +106,7 @@ public class SqlCreateView extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (viewName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return viewName.names.subList(0, viewName.names.size()-1);
+    return SchemaUtilites.getSchemaPath(viewName);
   }
 
   public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTable.java
index f5bf0b7..797545e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql.parser;
 
 import java.util.List;
 
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.DropTableHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
@@ -87,11 +88,7 @@ public class SqlDropTable extends DrillSqlCall {
   }
 
   public List<String> getSchema() {
-    if (tableName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return tableName.names.subList(0, tableName.names.size()-1);
+    return SchemaUtilites.getSchemaPath(tableName);
   }
 
   public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTableMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTableMetadata.java
index 3bec792..9f21922 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTableMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropTableMetadata.java
@@ -27,13 +27,13 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.Util;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.MetastoreDropTableMetadataHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.util.Pointer;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 public class SqlDropTableMetadata extends DrillSqlCall {
@@ -94,11 +94,7 @@ public class SqlDropTableMetadata extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (tableName.isSimple()) {
-      return Collections.emptyList();
-    }
-
-    return tableName.names.subList(0, tableName.names.size() - 1);
+    return SchemaUtilites.getSchemaPath(tableName);
   }
 
   public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
index bfd3474..2f5e002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql.parser;
 
 import java.util.List;
 
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.planner.sql.handlers.ViewHandler.DropView;
@@ -87,11 +88,7 @@ public class SqlDropView extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (viewName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return viewName.names.subList(0, viewName.names.size()-1);
+    return SchemaUtilites.getSchemaPath(viewName);
   }
 
   public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
index 2a22f90..a2bf8a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlMetastoreAnalyzeTable.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.planner.sql.parser;
 
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -28,7 +27,6 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.MetastoreAnalyzeTableHandler;
@@ -36,7 +34,6 @@ import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.util.Pointer;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -44,21 +41,21 @@ public class SqlMetastoreAnalyzeTable extends DrillSqlCall {
 
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("METASTORE_ANALYZE_TABLE", SqlKind.OTHER_DDL) {
     public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
-      return new SqlMetastoreAnalyzeTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], operands[2],
+      return new SqlMetastoreAnalyzeTable(pos, operands[0], (SqlNodeList) operands[1], operands[2],
           (SqlLiteral) operands[3], (SqlNumericLiteral) operands[4]);
     }
   };
 
-  private final SqlIdentifier tableName;
+  private final SqlNode tableRef;
   private final SqlNodeList fieldList;
   private final SqlLiteral level;
   private final SqlLiteral estimate;
   private final SqlNumericLiteral samplePercent;
 
-  public SqlMetastoreAnalyzeTable(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList fieldList,
+  public SqlMetastoreAnalyzeTable(SqlParserPos pos, SqlNode tableRef, SqlNodeList fieldList,
       SqlNode level, SqlLiteral estimate, SqlNumericLiteral samplePercent) {
     super(pos);
-    this.tableName = tableName;
+    this.tableRef = tableRef;
     this.fieldList = fieldList;
     this.level = level != null ? SqlLiteral.unchain(level) : null;
     this.estimate = estimate;
@@ -72,14 +69,14 @@ public class SqlMetastoreAnalyzeTable extends DrillSqlCall {
 
   @Override
   public List<SqlNode> getOperandList() {
-    return Arrays.asList(tableName, fieldList, level, estimate, samplePercent);
+    return Arrays.asList(tableRef, fieldList, level, estimate, samplePercent);
   }
 
   @Override
   public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
     writer.keyword("ANALYZE");
     writer.keyword("TABLE");
-    tableName.unparse(writer, leftPrec, rightPrec);
+    tableRef.unparse(writer, leftPrec, rightPrec);
     if (fieldList != null) {
       writer.keyword("COLUMNS");
       if (fieldList.size() > 0) {
@@ -120,20 +117,8 @@ public class SqlMetastoreAnalyzeTable extends DrillSqlCall {
     return getSqlHandler(config, null);
   }
 
-  public List<String> getSchemaPath() {
-    if (tableName.isSimple()) {
-      return Collections.emptyList();
-    }
-
-    return tableName.names.subList(0, tableName.names.size() - 1);
-  }
-
-  public SqlIdentifier getTableIdentifier() {
-    return tableName;
-  }
-
-  public String getName() {
-    return Util.last(tableName.names);
+  public SqlNode getTableRef() {
+    return tableRef;
   }
 
   public List<SchemaPath> getFieldNames() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
index 352357e..9dd66d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
@@ -29,11 +29,11 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -82,7 +82,7 @@ public class SqlRefreshMetadata extends DrillSqlCall {
       writer.keyword("COLUMNS");
       if (fieldList == null) {
         writer.keyword("NONE");
-      } else if (fieldList != null && fieldList.size() > 0) {
+      } else if (fieldList.size() > 0) {
         writer.keyword("(");
         fieldList.get(0).unparse(writer, leftPrec, rightPrec);
         for (int i = 1; i < fieldList.size(); i++) {
@@ -104,11 +104,7 @@ public class SqlRefreshMetadata extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (tblName.isSimple()) {
-      return ImmutableList.of();
-    }
-
-    return tblName.names.subList(0, tblName.names.size() - 1);
+    return SchemaUtilites.getSchemaPath(tblName);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
index 97f52a3..9fc3161 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
@@ -30,13 +30,13 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.util.SqlBasicVisitor;
 import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.planner.sql.handlers.SchemaHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,10 +79,7 @@ public abstract class SqlSchema extends DrillSqlCall {
   }
 
   public List<String> getSchemaPath() {
-    if (hasTable()) {
-      return table.isSimple() ? Collections.emptyList() : table.names.subList(0, table.names.size() - 1);
-    }
-    return null;
+    return hasTable() ? SchemaUtilites.getSchemaPath(table) : null;
   }
 
   public String getTableName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 4125ad3..c411683 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -261,6 +261,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
       Table table = getTable(name);
       if (table instanceof DrillTable) {
         return applyFunctionParameters((DrillTable) table, parameters, arguments);
+      } else if (table == null) {
+        return null;
       }
       throw new DrillRuntimeException(String.format("Table [%s] is not of Drill table instance. " +
         "Given instance is of [%s].", name, table.getClass().getName()));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 7d9d9dd..b68106c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -443,40 +443,8 @@ public class WorkspaceSchemaFactory {
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
       }
-      final DrillTable table = tables.get(tableKey);
-      if (table != null) {
-        MetadataProviderManager providerManager = null;
-
-        if (schemaConfig.getOption(ExecConstants.METASTORE_ENABLED).bool_val) {
-          try {
-            MetastoreRegistry metastoreRegistry = plugin.getContext().getMetastoreRegistry();
-            TableInfo tableInfo = TableInfo.builder()
-                .storagePlugin(plugin.getName())
-                .workspace(schemaName)
-                .name(tableName)
-                .build();
-
-            MetastoreTableInfo metastoreTableInfo = metastoreRegistry.get()
-                .tables()
-                .basicRequests()
-                .metastoreTableInfo(tableInfo);
-            if (metastoreTableInfo.isExists()) {
-              providerManager = new MetastoreMetadataProviderManager(metastoreRegistry, tableInfo,
-                  new MetastoreMetadataProviderConfig(schemaConfig.getOption(ExecConstants.METASTORE_USE_SCHEMA_METADATA).bool_val,
-                      schemaConfig.getOption(ExecConstants.METASTORE_USE_STATISTICS_METADATA).bool_val,
-                      schemaConfig.getOption(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA).bool_val));
-            }
-          } catch (MetastoreException e) {
-            logger.warn("Exception happened during obtaining Metastore instance.", e);
-          }
-        }
-        if (providerManager == null) {
-          providerManager = FileSystemMetadataProviderManager.init();
-        }
-        setMetadataTable(providerManager, table, tableName);
-        setSchema(providerManager, tableName);
-        table.setTableMetadataProviderManager(providerManager);
-      }
+      DrillTable table = tables.get(tableKey);
+      setMetadataProviderManager(table, tableName);
       return table;
     }
 
@@ -640,6 +608,7 @@ public class WorkspaceSchemaFactory {
           FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key);
           FormatSelection selection = new FormatSelection(formatConfig, newSelection);
           DrillTable drillTable = new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
+          setMetadataProviderManager(drillTable, key.sig.getName());
 
           List<TableParamDef> commonParams = key.sig.getCommonParams();
           if (commonParams.isEmpty()) {
@@ -654,6 +623,7 @@ public class WorkspaceSchemaFactory {
           for (final FormatMatcher matcher : dirMatchers) {
             try {
               DrillTable table = matcher.isReadable(getFS(), fileSelection, plugin, storageEngineName, schemaConfig);
+              setMetadataProviderManager(table, key.sig.getName());
               if (table != null) {
                 return table;
               }
@@ -670,6 +640,7 @@ public class WorkspaceSchemaFactory {
 
         for (final FormatMatcher matcher : fileMatchers) {
           DrillTable table = matcher.isReadable(getFS(), newSelection, plugin, storageEngineName, schemaConfig);
+          setMetadataProviderManager(table, key.sig.getName());
           if (table != null) {
             return table;
           }
@@ -721,6 +692,42 @@ public class WorkspaceSchemaFactory {
       return null;
     }
 
+    private void setMetadataProviderManager(DrillTable table, String tableName) {
+      if (table != null) {
+        MetadataProviderManager providerManager = null;
+
+        if (schemaConfig.getOption(ExecConstants.METASTORE_ENABLED).bool_val) {
+          try {
+            MetastoreRegistry metastoreRegistry = plugin.getContext().getMetastoreRegistry();
+            TableInfo tableInfo = TableInfo.builder()
+                .storagePlugin(plugin.getName())
+                .workspace(schemaName)
+                .name(tableName)
+                .build();
+
+            MetastoreTableInfo metastoreTableInfo = metastoreRegistry.get()
+                .tables()
+                .basicRequests()
+                .metastoreTableInfo(tableInfo);
+            if (metastoreTableInfo.isExists()) {
+              providerManager = new MetastoreMetadataProviderManager(metastoreRegistry, tableInfo,
+                  new MetastoreMetadataProviderConfig(schemaConfig.getOption(ExecConstants.METASTORE_USE_SCHEMA_METADATA).bool_val,
+                      schemaConfig.getOption(ExecConstants.METASTORE_USE_STATISTICS_METADATA).bool_val,
+                      schemaConfig.getOption(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA).bool_val));
+            }
+          } catch (MetastoreException e) {
+            logger.warn("Exception happened during obtaining Metastore instance. File system metadata provider will be used.", e);
+          }
+        }
+        if (providerManager == null) {
+          providerManager = FileSystemMetadataProviderManager.init();
+        }
+        setMetadataTable(providerManager, table, tableName);
+        setSchema(providerManager, tableName);
+        table.setTableMetadataProviderManager(providerManager);
+      }
+    }
+
     @Override
     public void destroy(DrillTable value) {
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index 0395b61..c0271a0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -24,38 +24,64 @@ import java.nio.file.Paths;
 import org.apache.drill.categories.PlannerTest;
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({SqlTest.class, PlannerTest.class})
-public class TestPartitionFilter extends PlanTestBase {
+public class TestPartitionFilter extends ClusterTest {
 
-  private static void testExcludeFilter(String query, int expectedNumFiles,
-      String excludedFilterPattern, int expectedRowCount) throws Exception {
-    int actualRowCount = testSql(query);
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
+  private void testExcludeFilter(String query, int expectedNumFiles,
+      String excludedFilterPattern, long expectedRowCount) throws Exception {
+    long actualRowCount = queryBuilder()
+        .sql(query)
+        .run()
+        .recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern}, new String[]{excludedFilterPattern});
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern)
+        .exclude(excludedFilterPattern)
+        .match();
   }
 
-  private static void testIncludeFilter(String query, int expectedNumFiles,
+  private void testIncludeFilter(String query, int expectedNumFiles,
       String includedFilterPattern, int expectedRowCount) throws Exception {
-    int actualRowCount = testSql(query);
+    long actualRowCount = queryBuilder()
+        .sql(query)
+        .run()
+        .recordCount();
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, includedFilterPattern}, new String[]{});
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(numFilesPattern, includedFilterPattern)
+        .match();
   }
 
   @BeforeClass
   public static void createParquetTable() throws Exception {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
 
-    test("alter session set `planner.disable_exchanges` = true");
-    test("create table dfs.tmp.parquet partition by (yr, qrtr) as select o_orderkey, o_custkey, " +
-        "o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, cast(dir0 as int) yr, dir1 qrtr " +
-        "from dfs.`multilevel/parquet`");
-    test("alter session set `planner.disable_exchanges` = false");
+    try {
+      client.alterSession(PlannerSettings.EXCHANGE.getOptionName(), true);
+      run("create table dfs.tmp.parquet partition by (yr, qrtr) as select o_orderkey, o_custkey, " +
+          "o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, cast(dir0 as int) yr, dir1 qrtr " +
+          "from dfs.`multilevel/parquet`");
+    } finally {
+      client.resetSession(PlannerSettings.EXCHANGE.getOptionName());
+    }
   }
 
   @Test  //Parquet: basic test with dir0 and dir1 filters
@@ -152,28 +178,28 @@ public class TestPartitionFilter extends PlanTestBase {
   public void testPartitionFilter4_Parquet() throws Exception {
     String query1 = "select t1.dir0, t1.dir1, t1.o_custkey, t1.o_orderdate, cast(t2.c_name as varchar(10)) from dfs.`multilevel/parquet` t1, cp.`tpch/customer.parquet` t2 where" +
       " t1.o_custkey = t2.c_custkey and t1.dir0=1994 and t1.dir1='Q1'";
-    test(query1);
+    run(query1);
   }
 
   @Test //Parquet: filters contain join conditions and partition filters
   public void testPartitionFilter4_Parquet_from_CTAS() throws Exception {
     String query1 = "select t1.dir0, t1.dir1, t1.o_custkey, t1.o_orderdate, cast(t2.c_name as varchar(10)) from dfs.tmp.parquet t1, cp.`tpch/customer.parquet` t2 where " +
       "t1.o_custkey = t2.c_custkey and t1.yr=1994 and t1.qrtr='Q1'";
-    test(query1);
+    run(query1);
   }
 
   @Test //Json: filters contain join conditions and partition filters
   public void testPartitionFilter4_Json() throws Exception {
     String query1 = "select t1.dir0, t1.dir1, t1.o_custkey, t1.o_orderdate, cast(t2.c_name as varchar(10)) from dfs.`multilevel/json` t1, cp.`tpch/customer.parquet` t2 where " +
     "cast(t1.o_custkey as bigint) = cast(t2.c_custkey as bigint) and t1.dir0=1994 and t1.dir1='Q1'";
-    test(query1);
+    run(query1);
   }
 
   @Test //CSV: filters contain join conditions and partition filters
   public void testPartitionFilter4_Csv() throws Exception {
     String query1 = "select t1.dir0, t1.dir1, t1.columns[1] as o_custkey, t1.columns[4] as o_orderdate, cast(t2.c_name as varchar(10)) from dfs.`multilevel/csv` t1, cp" +
     ".`tpch/customer.parquet` t2 where cast(t1.columns[1] as bigint) = cast(t2.c_custkey as bigint) and t1.dir0=1994 and t1.dir1='Q1'";
-    test(query1);
+    run(query1);
   }
 
   @Test // Parquet: IN filter
@@ -364,6 +390,14 @@ public class TestPartitionFilter extends PlanTestBase {
   }
 
   @Test
+  public void testLogicalDirPruningWithTableFunction() throws Exception {
+    // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
+    // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
+    String query = "select dir0, o_custkey from table(dfs.`multilevel/parquetWithBadFormat` (type => 'parquet')) where dir0=1995";
+    testExcludeFilter(query, 1, "Filter\\(", 10);
+  }
+
+  @Test
   public void testLogicalDirPruning2() throws Exception {
     // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet.
     // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan.
@@ -388,12 +422,16 @@ public class TestPartitionFilter extends PlanTestBase {
   @Test //DRILL-3710 Partition pruning should occur with varying IN-LIST size
   public void testPartitionFilterWithInSubquery() throws Exception {
     String query = "select * from dfs.`multilevel/parquet` where cast (dir0 as int) IN (1994, 1994, 1994, 1994, 1994, 1994)";
-    /* In list size exceeds threshold - no partition pruning since predicate converted to join */
-    test("alter session set `planner.in_subquery_threshold` = 2");
-    testExcludeFilter(query, 12, "Filter\\(", 40);
-    /* In list size does not exceed threshold - partition pruning */
-    test("alter session set `planner.in_subquery_threshold` = 10");
-    testExcludeFilter(query, 4, "Filter\\(", 40);
+    try {
+      /* In list size exceeds threshold - no partition pruning since predicate converted to join */
+      client.alterSession(PlannerSettings.IN_SUBQUERY_THRESHOLD.getOptionName(), 2);
+      testExcludeFilter(query, 12, "Filter\\(", 40);
+      /* In list size does not exceed threshold - partition pruning */
+      client.alterSession(PlannerSettings.IN_SUBQUERY_THRESHOLD.getOptionName(), 10);
+      testExcludeFilter(query, 4, "Filter\\(", 40);
+    } finally {
+      client.resetSession(PlannerSettings.IN_SUBQUERY_THRESHOLD.getOptionName());
+    }
   }
 
   @Test // DRILL-4825: querying same table with different filter in UNION ALL.
@@ -402,17 +440,19 @@ public class TestPartitionFilter extends PlanTestBase {
         + "( select dir0 from dfs.`multilevel/parquet` where dir0 in ('1994') union all "
         + "  select dir0 from dfs.`multilevel/parquet` where dir0 in ('1995', '1996') )";
 
-    String [] excluded = {"Filter\\("};
-
     // verify plan that filter is applied in partition pruning.
-    testPlanMatchingPatterns(query, null, excluded);
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Filter\\(")
+        .match();
 
     // verify we get correct count(*).
     testBuilder()
         .sqlQuery(query)
         .unOrdered()
         .baselineColumns("cnt")
-        .baselineValues((long)120)
+        .baselineValues(120L)
         .build()
         .run();
   }
@@ -424,9 +464,12 @@ public class TestPartitionFilter extends PlanTestBase {
             + " ( select sum(o_custkey) as y from dfs.`multilevel/parquet` where dir0 in ('1995', '1996')) "
             + " on x = y ";
 
-    String [] excluded = {"Filter\\("};
     // verify plan that filter is applied in partition pruning.
-    testPlanMatchingPatterns(query, null, excluded);
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .exclude("Filter\\(")
+        .match();
 
     // verify we get empty result.
     testBuilder()
@@ -446,11 +489,19 @@ public class TestPartitionFilter extends PlanTestBase {
     String [] excluded = {"1995", "Filter\\("};
 
     // verify we get correct count(*).
-    int actualRowCount = testSql(query);
+    long actualRowCount = queryBuilder()
+        .sql(query)
+        .run()
+        .recordCount();
     int expectedRowCount = 800;
     assertEquals("Expected and actual row count should match", expectedRowCount, actualRowCount);
 
     // verify plan that filter is applied in partition pruning.
-    testPlanMatchingPatterns(query, expectedPlan, excluded);
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlan)
+        .exclude(excluded)
+        .match();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index c5c283d..294a375 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -28,15 +28,28 @@ import java.io.IOException;
 import java.nio.file.Paths;
 
 import org.apache.drill.categories.SqlTest;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 @Category(SqlTest.class)
-public class TestSelectWithOption extends BaseTestQuery {
+public class TestSelectWithOption extends ClusterTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
 
   private File genCSVFile(String name, String... rows) throws IOException {
     File file = new File(format("%s/%s.csv", dirTestWatcher.getRootDir(), name));
@@ -254,12 +267,12 @@ public class TestSelectWithOption extends BaseTestQuery {
         "{\"columns\": [\"f\",\"g\"]}");
     String jsonTableName = String.format("dfs.`%s`", f.getName());
     // the extension is actually csv
-    test("use dfs");
+    run("use dfs");
     try {
       testWithResult(format("select columns from table(%s(type => 'JSON'))", jsonTableName), listOf("f","g"));
       testWithResult(format("select length(columns[0]) as columns from table(%s (type => 'JSON'))", jsonTableName), 1L);
     } finally {
-      test("use sys");
+      run("use sys");
     }
   }
 
@@ -268,7 +281,7 @@ public class TestSelectWithOption extends BaseTestQuery {
     String schema = "cp.default";
     String tableName = "absent_table";
     try {
-      test("select * from table(`%s`.`%s`(type=>'parquet'))", schema, tableName);
+      run("select * from table(`%s`.`%s`(type=>'parquet'))", schema, tableName);
     } catch (UserRemoteException e) {
       assertThat(e.getMessage(), containsString(String.format("Unable to find table [%s]", tableName)));
       throw e;
@@ -279,10 +292,10 @@ public class TestSelectWithOption extends BaseTestQuery {
   public void testTableFunctionWithDirectoryExpansion() throws Exception {
     String tableName = "dirTable";
     String query = "select 'A' as col from (values(1))";
-    test("use dfs.tmp");
+    run("use dfs.tmp");
     try {
-      alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
-      test("create table %s as %s", tableName, query);
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+      run("create table %s as %s", tableName, query);
 
       testBuilder()
         .sqlQuery("select * from table(%s(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName)
@@ -290,8 +303,8 @@ public class TestSelectWithOption extends BaseTestQuery {
         .sqlBaselineQuery(query)
         .go();
     } finally {
-      resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION);
-      test("drop table if exists %s", tableName);
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+      run("drop table if exists %s", tableName);
     }
   }
 
@@ -305,4 +318,11 @@ public class TestSelectWithOption extends BaseTestQuery {
       .go();
   }
 
+  @Test
+  public void testTableFunctionWithNonExistingTable() throws Exception {
+    thrown.expect(UserException.class);
+    thrown.expectMessage("Unable to find table");
+    run("select * from table(dfs.tmp.`nonExistingTable`(schema=>'inline=(mykey int)'))");
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
index 121b784..5e866f0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
@@ -110,7 +110,7 @@ public class TestAnalyze extends ClusterTest {
       client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "parquet");
       client.alterSession(ExecConstants.DETERMINISTIC_SAMPLING, true);
       run("CREATE TABLE dfs.tmp.employee_basic3 AS SELECT * from cp.`employee.json`");
-      run("ANALYZE TABLE dfs.tmp.employee_basic3 COMPUTE STATISTICS (employee_id, birth_date) SAMPLE 55 PERCENT");
+      run("ANALYZE TABLE table(dfs.tmp.employee_basic3 (type => 'parquet')) COMPUTE STATISTICS (employee_id, birth_date) SAMPLE 55 PERCENT");
 
       testBuilder()
           .sqlQuery("SELECT tbl.`columns`.`column` as `column`, tbl.`columns`.rowcount is not null as has_rowcount,"
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 394adca..4b0aad7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -3470,6 +3470,36 @@ public class TestMetastoreCommands extends ClusterTest {
     }
   }
 
+  @Test
+  public void testTableFunctionForParquet() throws Exception {
+    String tableName = "corrupted_dates";
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "4203_corrupt_dates").resolve("mixed_drill_versions"), Paths.get(tableName));
+
+    // sets autoCorrectCorruptDates to false to store incorrect metadata which will be used during files and filter pruning
+    testBuilder()
+        .sqlQuery("analyze table table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false, enableStringsSignedMinMax=>false)) REFRESH METADATA", tableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+        .go();
+
+    queryBuilder()
+        .sql("select date_col from dfs.`%s` where date_col > '2016-01-01'", tableName)
+        .planMatcher()
+        .include("usedMetastore=true")
+        .exclude("Filter")
+        .match();
+  }
+
+  @Test
+  public void testTableFunctionWithDrop() throws Exception {
+    String tableName = "dropWitTableFunction";
+    dirTestWatcher.copyResourceToTestTmp(Paths.get("tpchmulti", "nation"), Paths.get(tableName));
+
+    thrown.expect(UserRemoteException.class);
+    run("analyze table table(dfs.tmp.`%s` (type => 'parquet', autoCorrectCorruptDates => false, enableStringsSignedMinMax=>false)) DROP METADATA", tableName);
+  }
+
   private static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue,
       long rowCount, TypeProtos.MinorType minorType) {
     return new ColumnStatistics<>(