You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/08/15 15:42:42 UTC

[drill] branch master updated: DRILL-6680: Expose show files command into INFORMATION_SCHEMA

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b8376cc  DRILL-6680: Expose show files command into INFORMATION_SCHEMA
b8376cc is described below

commit b8376cce07dc6b5fe2b3408a4b4a88c3b21816dd
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Mon Jul 30 23:50:12 2018 +0300

    DRILL-6680: Expose show files command into INFORMATION_SCHEMA
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   2 +
 .../planner/sql/handlers/DescribeTableHandler.java |  12 +-
 .../exec/planner/sql/handlers/ShowFileHandler.java | 107 -------------
 .../sql/handlers/ShowFilesCommandResult.java       |  82 ----------
 .../planner/sql/handlers/ShowFilesHandler.java     | 124 +++++++++++++++
 .../planner/sql/handlers/ShowSchemasHandler.java   |  10 +-
 .../planner/sql/handlers/ShowTablesHandler.java    |   6 +-
 .../exec/planner/sql/parser/SqlShowFiles.java      |   6 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../exec/store/ischema/InfoSchemaConstants.java    |  88 +++++------
 .../store/ischema/InfoSchemaFilterBuilder.java     |  10 +-
 .../store/ischema/InfoSchemaRecordGenerator.java   |  85 ++++++++++-
 .../drill/exec/store/ischema/InfoSchemaTable.java  | 102 +++++++++----
 .../exec/store/ischema/InfoSchemaTableType.java    |   8 +-
 .../apache/drill/exec/store/ischema/Records.java   |  44 +++++-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../test/java/org/apache/drill/TestDropTable.java  |   2 -
 .../exec/impersonation/BaseTestImpersonation.java  |  15 +-
 .../impersonation/TestImpersonationMetadata.java   |  33 ++--
 .../org/apache/drill/exec/sql/TestInfoSchema.java  |  16 +-
 .../drill/exec/store/ischema/TestFilesTable.java   | 169 +++++++++++++++++++++
 .../exec/work/metadata/TestMetadataProvider.java   |  14 +-
 22 files changed, 603 insertions(+), 334 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9bec393..8eacded 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -719,4 +719,6 @@ public final class ExecConstants {
   public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators";
   public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);
 
+  public static final String LIST_FILES_RECURSIVELY = "storage.list_files_recursively";
+  public static final BooleanValidator LIST_FILES_RECURSIVELY_VALIDATOR = new BooleanValidator(LIST_FILES_RECURSIVELY);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index d96f3e1..32768f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -23,7 +23,6 @@ import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_I
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_COLUMNS;
 
 import java.util.List;
 
@@ -45,6 +44,7 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.SqlConverter;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.google.common.collect.ImmutableList;
@@ -56,17 +56,17 @@ public class DescribeTableHandler extends DefaultSqlHandler {
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.COLUMNS ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     DrillSqlDescribeTable node = unwrap(sqlNode, DrillSqlDescribeTable.class);
 
     try {
       List<SqlNode> selectList =
-          ImmutableList.of((SqlNode) new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                                     new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
-                                     new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
+          ImmutableList.of(new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+                           new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
+                           new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
 
       SqlNode fromClause = new SqlIdentifier(
-          ImmutableList.of(IS_SCHEMA_NAME, TAB_COLUMNS), null, SqlParserPos.ZERO, null);
+          ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), null, SqlParserPos.ZERO, null);
 
       final SqlIdentifier table = node.getTable();
       final SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
deleted file mode 100644
index 307b01d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.planner.sql.DirectPlan;
-import org.apache.drill.exec.planner.sql.SchemaUtilites;
-import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.FileSystemUtil;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-
-public class ShowFileHandler extends DefaultSqlHandler {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
-
-  public ShowFileHandler(SqlHandlerConfig config) {
-    super(config);
-  }
-
-  @Override
-  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException {
-
-    SqlIdentifier from = ((SqlShowFiles) sqlNode).getDb();
-
-    DrillFileSystem fs;
-    String defaultLocation;
-    String fromDir = "./";
-
-    SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
-    SchemaPlus drillSchema = defaultSchema;
-
-    // Show files can be used without from clause, in which case we display the files in the default schema
-    if (from != null) {
-      // We are not sure if the full from clause is just the schema or includes table name,
-      // first try to see if the full path specified is a schema
-      drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names);
-      if (drillSchema == null) {
-        // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
-        drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
-        fromDir = fromDir + from.names.get((from.names.size() - 1));
-      }
-
-      if (drillSchema == null) {
-        throw UserException.validationError()
-            .message("Invalid FROM/IN clause [%s]", from.toString())
-            .build(logger);
-      }
-    }
-
-    WorkspaceSchema wsSchema;
-    try {
-       wsSchema = (WorkspaceSchema) drillSchema.unwrap(AbstractSchema.class).getDefaultSchema();
-    } catch (ClassCastException e) {
-      throw UserException.validationError()
-          .message("SHOW FILES is supported in workspace type schema only. Schema [%s] is not a workspace schema.",
-              SchemaUtilites.getSchemaPath(drillSchema))
-          .build(logger);
-    }
-
-    // Get the file system object
-    fs = wsSchema.getFS();
-
-    // Get the default path
-    defaultLocation = wsSchema.getDefaultLocation();
-
-    List<ShowFilesCommandResult> rows = new ArrayList<>();
-
-    for (FileStatus fileStatus : FileSystemUtil.listAll(fs, new Path(defaultLocation, fromDir), false)) {
-      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDirectory(),
-                                                                 fileStatus.isFile(), fileStatus.getLen(),
-                                                                 fileStatus.getOwner(), fileStatus.getGroup(),
-                                                                 fileStatus.getPermission().toString(),
-                                                                 fileStatus.getAccessTime(), fileStatus.getModificationTime());
-      rows.add(result);
-    }
-    return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), rows, ShowFilesCommandResult.class);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java
deleted file mode 100644
index 7d16388..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.sql.Timestamp;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-public class ShowFilesCommandResult {
-
-  /* Fields that will be returned as columns
-   * for a 'SHOW FILES' command
-   */
-
-  // Name of the file
-  public String name;
-
-  // Is it a directory
-  public boolean isDirectory;
-
-  // Is it a file
-  public boolean isFile;
-
-  // Length of the file
-  public long length;
-
-  // File owner
-  public String owner;
-
-  // File group
-  public String group;
-
-  // File permissions
-  public String permissions;
-
-  // Access Time
-  public Timestamp accessTime;
-
-  // Modification Time
-  public Timestamp modificationTime;
-
-  public ShowFilesCommandResult(String name,
-                                boolean isDirectory,
-                                boolean isFile,
-                                long length,
-                                String owner,
-                                String group,
-                                String permissions,
-                                long accessTime,
-                                long modificationTime) {
-    this.name = name;
-    this.isDirectory = isDirectory;
-    this.isFile = isFile;
-    this.length = length;
-    this.owner = owner;
-    this.group = group;
-    this.permissions = permissions;
-
-    // Get the timestamp in UTC because Drill's internal TIMESTAMP stores time in UTC
-    DateTime at = new DateTime(accessTime).withZoneRetainFields(DateTimeZone.UTC);
-    this.accessTime = new Timestamp(at.getMillis());
-
-    DateTime mt = new DateTime(modificationTime).withZoneRetainFields(DateTimeZone.UTC);
-    this.modificationTime = new Timestamp(mt.getMillis());
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
new file mode 100644
index 0000000..c9bac32
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
+import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_RELATIVE_PATH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
+
+
+public class ShowFilesHandler extends DefaultSqlHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
+
+  public ShowFilesHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
+  /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.FILES ... */
+  @Override
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
+
+    List<SqlNode> selectList = Collections.singletonList(SqlIdentifier.star(SqlParserPos.ZERO));
+
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.FILES.name()), SqlParserPos.ZERO);
+
+    SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
+    SchemaPlus drillSchema = defaultSchema;
+
+    SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class);
+    SqlIdentifier from = showFiles.getDb();
+    boolean addRelativePathLikeClause = false;
+
+    // Show files can be used without from clause, in which case we display the files in the default schema
+    if (from != null) {
+      // We are not sure if the full from clause is just the schema or includes table name,
+      // first try to see if the full path specified is a schema
+      drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names);
+      if (drillSchema == null) {
+        // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
+        drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
+        addRelativePathLikeClause = true;
+      }
+
+      if (drillSchema == null) {
+        throw UserException.validationError()
+            .message("Invalid FROM/IN clause [%s]", from.toString())
+            .build(logger);
+      }
+    }
+
+    String fullSchemaName;
+
+    try {
+      WorkspaceSchema wsSchema = (WorkspaceSchema) drillSchema.unwrap(AbstractSchema.class).getDefaultSchema();
+      fullSchemaName = wsSchema.getFullSchemaName();
+    } catch (ClassCastException e) {
+      throw UserException.validationError()
+          .message("SHOW FILES is supported in workspace type schema only. Schema [%s] is not a workspace schema.",
+              SchemaUtilites.getSchemaPath(drillSchema))
+          .build(logger);
+    }
+
+    SqlNode whereClause = DrillParserUtil.createCondition(new SqlIdentifier(FILES_COL_SCHEMA_NAME, SqlParserPos.ZERO),
+        SqlStdOperatorTable.EQUALS, SqlLiteral.createCharString(fullSchemaName, SqlParserPos.ZERO));
+
+    // listing for specific directory: show files in dfs.tmp.specific_directory
+    if (addRelativePathLikeClause) {
+      if (!context.getOptions().getBoolean(ExecConstants.LIST_FILES_RECURSIVELY)) {
+        throw UserException.validationError()
+            .message("To SHOW FILES in specific directory, enable option %s", ExecConstants.LIST_FILES_RECURSIVELY)
+            .build(logger);
+      }
+
+      // like clause: relative_path like 'specific_directory/%'
+      String folderPath = from.names.get(from.names.size() - 1);
+      folderPath = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+      SqlNode likeLiteral = SqlLiteral.createCharString(folderPath + "%", Util.getDefaultCharset().name(), SqlParserPos.ZERO);
+      SqlNode likeClause = DrillParserUtil.createCondition(new SqlIdentifier(FILES_COL_RELATIVE_PATH, SqlParserPos.ZERO),
+          SqlStdOperatorTable.LIKE, likeLiteral);
+
+      whereClause = DrillParserUtil.createCondition(whereClause, SqlStdOperatorTable.AND, likeClause);
+    }
+
+    return new SqlSelect(SqlParserPos.ZERO, null, new SqlNodeList(selectList, SqlParserPos.ZERO), fromClause, whereClause,
+        null, null, null, null, null, null);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
index 39c0f64..ab460ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
@@ -19,14 +19,12 @@ package org.apache.drill.exec.planner.sql.handlers;
 
 import java.util.List;
 
-import org.apache.calcite.tools.RelConversionException;
-
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_SCHEMATA;
 
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -43,13 +41,13 @@ public class ShowSchemasHandler extends DefaultSqlHandler {
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.SCHEMATA ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowSchemas node = unwrap(sqlNode, SqlShowSchemas.class);
     List<SqlNode> selectList =
-        ImmutableList.of((SqlNode) new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
+        ImmutableList.of(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
 
     SqlNode fromClause = new SqlIdentifier(
-        ImmutableList.of(IS_SCHEMA_NAME, TAB_SCHEMATA), null, SqlParserPos.ZERO, null);
+        ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), null, SqlParserPos.ZERO, null);
 
     SqlNode where = null;
     final SqlNode likePattern = node.getLikePattern();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index 58f205b..e73e829 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.sql.handlers;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_TABLES;
 
 import java.util.List;
 
@@ -43,6 +42,7 @@ import org.apache.drill.exec.planner.sql.SqlConverter;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.google.common.collect.ImmutableList;
@@ -55,7 +55,7 @@ public class ShowTablesHandler extends DefaultSqlHandler {
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.`TABLES` ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowTables node = unwrap(sqlNode, SqlShowTables.class);
     List<SqlNode> selectList = Lists.newArrayList();
     SqlNode fromClause;
@@ -65,7 +65,7 @@ public class ShowTablesHandler extends DefaultSqlHandler {
     selectList.add(new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO));
     selectList.add(new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
 
-    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, TAB_TABLES), SqlParserPos.ZERO);
+    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
 
     final SqlIdentifier db = node.getDb();
     String tableSchema;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
index 9b84a19..09a43f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
-import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler;
+import org.apache.drill.exec.planner.sql.handlers.ShowFilesHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -61,7 +61,7 @@ public class SqlShowFiles extends DrillSqlCall {
 
   @Override
   public List<SqlNode> getOperandList() {
-    return Collections.singletonList( (SqlNode) db);
+    return Collections.singletonList(db);
   }
 
   @Override
@@ -75,7 +75,7 @@ public class SqlShowFiles extends DrillSqlCall {
 
   @Override
   public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
-    return new ShowFileHandler(config);
+    return new ShowFilesHandler(config);
   }
 
   public SqlIdentifier getDb() { return db; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a627821..e3f21e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -239,6 +239,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new OptionDefinition(ExecConstants.LIST_FILES_RECURSIVELY_VALIDATOR)
     };
 
     CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index d9f2ff7..15bdcd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -17,55 +17,42 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-public final class InfoSchemaConstants {
-  /** Prevents instantiation. */
-  private InfoSchemaConstants() {
-  }
+public interface InfoSchemaConstants {
 
   /** Name of catalog containing information schema. */
-  public static final String IS_CATALOG_NAME = "DRILL";
+  String IS_CATALOG_NAME = "DRILL";
 
   /** Catalog description */
-  public static final String IS_CATALOG_DESCR = "The internal metadata used by Drill";
+  String IS_CATALOG_DESCRIPTION = "The internal metadata used by Drill";
 
   /** Catalog connect string. Currently empty */
-  public static final String IS_CATALOG_CONNECT = "";
+   String IS_CATALOG_CONNECT = "";
 
   /** Name of information schema. */
-  public static final String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
-
-  // TODO:  Resolve how to not have two different place defining table names:
-  // NOTE: These string values have to match the identifiers for SelectedTable's
-  // enumerators.
-  // Information schema's tables' names:
-  public static final String TAB_CATALOGS = "CATALOGS";
-  public static final String TAB_COLUMNS = "COLUMNS";
-  public static final String TAB_SCHEMATA = "SCHEMATA";
-  public static final String TAB_TABLES = "TABLES";
-  public static final String TAB_VIEWS = "VIEWS";
+   String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
 
   // CATALOGS column names:
-  public static final String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
-  public static final String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
-  public static final String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
+   String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
+   String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
+   String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
 
   // SCHEMATA column names:
-  public static final String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
-  public static final String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
-  public static final String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
-  public static final String SCHS_COL_TYPE = "TYPE";
-  public static final String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
+   String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
+   String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
+   String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
+   String SCHS_COL_TYPE = "TYPE";
+   String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
 
   // Common TABLES / VIEWS / COLUMNS columns names:
-  public static final String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
-  public static final String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
-  public static final String SHRD_COL_TABLE_NAME = "TABLE_NAME";
+   String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
+   String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
+   String SHRD_COL_TABLE_NAME = "TABLE_NAME";
 
   // Remaining TABLES column names:
-  public static final String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
+   String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
 
   // Remaining VIEWS column names:
-  public static final String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
+   String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
 
   // COLUMNS columns, from SQL standard:
   // 1. TABLE_CATALOG
@@ -87,18 +74,31 @@ public final class InfoSchemaConstants {
   // 17. CHARACTER_SET_CATALOG ...
 
   // Remaining COLUMNS column names:
-  public static final String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
-  public static final String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
-  public static final String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
-  public static final String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
-  public static final String COLS_COL_DATA_TYPE = "DATA_TYPE";
-  public static final String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
-  public static final String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
-  public static final String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
-  public static final String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
-  public static final String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
-  public static final String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
-  public static final String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
-  public static final String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
+   String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
+   String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
+   String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
+   String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
+   String COLS_COL_DATA_TYPE = "DATA_TYPE";
+   String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
+   String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
+   String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
+   String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
+   String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
+   String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
+   String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
+   String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
 
+  // FILES column names:
+   String FILES_COL_SCHEMA_NAME = SCHS_COL_SCHEMA_NAME;
+   String FILES_COL_ROOT_SCHEMA_NAME = "ROOT_SCHEMA_NAME";
+   String FILES_COL_WORKSPACE_NAME = "WORKSPACE_NAME";
+   String FILES_COL_FILE_NAME = "FILE_NAME";
+   String FILES_COL_RELATIVE_PATH = "RELATIVE_PATH";
+   String FILES_COL_IS_DIRECTORY = "IS_DIRECTORY";
+   String FILES_COL_IS_FILE = "IS_FILE";
+   String FILES_COL_LENGTH = "LENGTH";
+   String FILES_COL_OWNER = "OWNER";
+   String FILES_COL_GROUP = "GROUP";
+   String FILES_COL_PERMISSION = "PERMISSION";
+   String FILES_COL_MODIFICATION_TIME = "MODIFICATION_TIME";
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
index f06ad84..40b4b8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
@@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
@@ -148,7 +150,9 @@ public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void,
           || field.equals(SCHS_COL_SCHEMA_NAME)
           || field.equals(SHRD_COL_TABLE_NAME)
           || field.equals(SHRD_COL_TABLE_SCHEMA)
-          || field.equals(COLS_COL_COLUMN_NAME)) {
+          || field.equals(COLS_COL_COLUMN_NAME)
+          || field.equals(FILES_COL_ROOT_SCHEMA_NAME)
+          || field.equals(FILES_COL_WORKSPACE_NAME)) {
         return new FieldExprNode(field);
       }
     }
@@ -168,7 +172,9 @@ public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void,
         || field.equals(SCHS_COL_SCHEMA_NAME)
         || field.equals(SHRD_COL_TABLE_NAME)
         || field.equals(SHRD_COL_TABLE_SCHEMA)
-        || field.equals(COLS_COL_COLUMN_NAME)) {
+        || field.equals(COLS_COL_COLUMN_NAME)
+        || field.equals(FILES_COL_ROOT_SCHEMA_NAME)
+        || field.equals(FILES_COL_WORKSPACE_NAME)) {
       return new FieldExprNode(field);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index d2c8c6f..d41428b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -20,14 +20,20 @@ package org.apache.drill.exec.store.ischema;
 import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +48,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
@@ -49,6 +56,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Generates records for POJO RecordReader by scanning the given schema. At every level (catalog, schema, table, field),
@@ -110,6 +120,9 @@ public abstract class InfoSchemaRecordGenerator<S> {
   public void visitField(String schemaName, String tableName, RelDataTypeField field) {
   }
 
+  public void visitFiles(String schemaName, SchemaPlus schema) {
+  }
+
   protected boolean shouldVisitCatalog() {
     if (filter == null) {
       return true;
@@ -189,6 +202,32 @@ public abstract class InfoSchemaRecordGenerator<S> {
     return filter.evaluate(recordValues) != Result.FALSE;
   }
 
+  protected boolean shouldVisitFiles(String schemaName, SchemaPlus schemaPlus) {
+    if (filter == null) {
+      return true;
+    }
+
+    AbstractSchema schema;
+    try {
+      schema = schemaPlus.unwrap(AbstractSchema.class);
+    } catch (ClassCastException e) {
+      return false;
+    }
+
+    if (!(schema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
+      return false;
+    }
+
+    WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
+
+    Map<String, String> recordValues = new HashMap<>();
+    recordValues.put(FILES_COL_SCHEMA_NAME, schemaName);
+    recordValues.put(FILES_COL_ROOT_SCHEMA_NAME, wsSchema.getSchemaPath().get(0));
+    recordValues.put(FILES_COL_WORKSPACE_NAME, wsSchema.getName());
+
+    return filter.evaluate(recordValues) != Result.FALSE;
+  }
+
   public abstract PojoRecordReader<S> getRecordReader();
 
   public void scanSchema(SchemaPlus root) {
@@ -207,7 +246,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
     // Recursively scan any subschema.
     for (String name: schema.getSubSchemaNames()) {
       scanSchema(schemaPath +
-          (schemaPath == "" ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
+          ("".equals(schemaPath) ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
           name, schema.getSubSchema(name));
     }
 
@@ -215,6 +254,10 @@ public abstract class InfoSchemaRecordGenerator<S> {
     if (shouldVisitSchema(schemaPath, schema) && visitSchema(schemaPath, schema)) {
       visitTables(schemaPath, schema);
     }
+
+    if (shouldVisitFiles(schemaPath, schema)) {
+      visitFiles(schemaPath, schema);
+    }
   }
 
   /**
@@ -256,7 +299,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public boolean visitCatalog() {
-      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCR, IS_CATALOG_CONNECT));
+      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCRIPTION, IS_CATALOG_CONNECT));
       return false;
     }
   }
@@ -316,7 +359,6 @@ public abstract class InfoSchemaRecordGenerator<S> {
           .checkNotNull(type, "Error. Type information for table %s.%s provided is null.", schemaName,
               tableName);
       records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, type.toString()));
-      return;
     }
 
     @Override
@@ -371,4 +413,39 @@ public abstract class InfoSchemaRecordGenerator<S> {
       records.add(new Records.Column(IS_CATALOG_NAME, schemaName, tableName, field));
     }
   }
+
+  public static class Files extends InfoSchemaRecordGenerator<Records.File> {
+
+    List<Records.File> records = new ArrayList<>();
+
+    public Files(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.File> getRecordReader() {
+      return new PojoRecordReader<>(Records.File.class, records);
+    }
+
+    @Override
+    public void visitFiles(String schemaName, SchemaPlus schemaPlus) {
+      try {
+        AbstractSchema schema = schemaPlus.unwrap(AbstractSchema.class);
+        if (schema instanceof WorkspaceSchemaFactory.WorkspaceSchema) {
+          WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
+          String defaultLocation = wsSchema.getDefaultLocation();
+          FileSystem fs = wsSchema.getFS();
+          boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
+          FileSystemUtil.listAll(fs, new Path(defaultLocation), recursive).forEach(
+              fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
+          );
+        }
+      } catch (ClassCastException | UnsupportedOperationException e) {
+        // ignore the exception since either this is not a Drill schema or schema does not support files listing
+      } catch (IOException e) {
+        logger.warn("Failure while trying to list files", e);
+      }
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
index 64b8c42..c6ebfcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
@@ -33,6 +33,18 @@ import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_N
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION_RADIX;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_SCALE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_ORDINAL_POSITION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_FILE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_GROUP;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_IS_DIRECTORY;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_IS_FILE;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_LENGTH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_MODIFICATION_TIME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_OWNER;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_PERMISSION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_RELATIVE_PATH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_IS_MUTABLE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
@@ -41,11 +53,6 @@ import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_T
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_CATALOG;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_CATALOGS;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_COLUMNS;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_SCHEMATA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_TABLES;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_VIEWS;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.VIEWS_COL_VIEW_DEFINITION;
 
@@ -63,8 +70,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
- * Base class for tables in INFORMATION_SCHEMA.  Defines the table (fields and
- * types).
+ * Base class for tables in INFORMATION_SCHEMA.  Defines the table (fields and types).
  */
 public abstract class InfoSchemaTable<S> {
 
@@ -80,30 +86,17 @@ public abstract class InfoSchemaTable<S> {
     }
   }
 
-  public static final MajorType VARCHAR = Types.required(MinorType.VARCHAR);
   public static final MajorType INT = Types.required(MinorType.INT);
+  public static final MajorType BIGINT = Types.required(MinorType.BIGINT);
+  public static final MajorType VARCHAR = Types.required(MinorType.VARCHAR);
+  public static final MajorType BIT = Types.required(MinorType.BIT);
 
-  private final String tableName;
   private final List<Field> fields;
 
-  public InfoSchemaTable(String tableName, List<Field> fields) {
-    this.tableName = tableName;
+  public InfoSchemaTable(List<Field> fields) {
     this.fields = fields;
   }
 
-  static public RelDataType getRelDataType(RelDataTypeFactory typeFactory, MajorType type) {
-    switch (type.getMinorType()) {
-    case INT:
-      return typeFactory.createSqlType(SqlTypeName.INTEGER);
-    case VARCHAR:
-      // Note:  Remember to not default to "VARCHAR(1)":
-      return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
-    default:
-      throw new UnsupportedOperationException(
-          "Only INT and VARCHAR types are supported in INFORMATION_SCHEMA");
-    }
-  }
-
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
 
     // Convert the array of Drill types to an array of Optiq types
@@ -117,10 +110,26 @@ public abstract class InfoSchemaTable<S> {
     return typeFactory.createStructType(relTypes, fieldNames);
   }
 
+  private RelDataType getRelDataType(RelDataTypeFactory typeFactory, MajorType type) {
+    switch (type.getMinorType()) {
+      case INT:
+        return typeFactory.createSqlType(SqlTypeName.INTEGER);
+      case BIGINT:
+        return typeFactory.createSqlType(SqlTypeName.BIGINT);
+      case VARCHAR:
+        // Note:  Remember to not default to "VARCHAR(1)":
+        return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+      case BIT:
+        return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+      default:
+        throw new UnsupportedOperationException("Only INT, BIGINT, VARCHAR and BOOLEAN types are supported in " + InfoSchemaConstants.IS_SCHEMA_NAME);
+    }
+  }
+
   public abstract InfoSchemaRecordGenerator<S> getRecordGenerator(OptionManager optionManager);
 
   /** Layout for the CATALOGS table. */
-  static public class Catalogs extends InfoSchemaTable<Records.Catalog> {
+  public static class Catalogs extends InfoSchemaTable<Records.Catalog> {
     // NOTE:  Nothing seems to verify that the types here (apparently used
     // by SQL validation) match the types of the fields in Records.Catalogs).
     private static final List<Field> fields = ImmutableList.of(
@@ -128,8 +137,8 @@ public abstract class InfoSchemaTable<S> {
         Field.create(CATS_COL_CATALOG_DESCRIPTION, VARCHAR),
         Field.create(CATS_COL_CATALOG_CONNECT, VARCHAR));
 
-    Catalogs() {
-      super(TAB_CATALOGS, fields);
+    public Catalogs() {
+      super(fields);
     }
 
     @Override
@@ -150,7 +159,7 @@ public abstract class InfoSchemaTable<S> {
         Field.create(SCHS_COL_IS_MUTABLE, VARCHAR));
 
     public Schemata() {
-      super(TAB_SCHEMATA, fields);
+      super(fields);
     }
 
     @Override
@@ -170,7 +179,7 @@ public abstract class InfoSchemaTable<S> {
         Field.create(TBLS_COL_TABLE_TYPE, VARCHAR));
 
     public Tables() {
-      super(TAB_TABLES, fields);
+      super(fields);
     }
 
     @Override
@@ -180,7 +189,7 @@ public abstract class InfoSchemaTable<S> {
   }
 
   /** Layout for the VIEWS table. */
-  static public class Views extends InfoSchemaTable<Records.View> {
+  public static class Views extends InfoSchemaTable<Records.View> {
     // NOTE:  Nothing seems to verify that the types here (apparently used
     // by SQL validation) match the types of the fields in Records.Views).
     private static final List<Field> fields = ImmutableList.of(
@@ -190,7 +199,7 @@ public abstract class InfoSchemaTable<S> {
         Field.create(VIEWS_COL_VIEW_DEFINITION, VARCHAR));
 
     public Views() {
-      super(TAB_VIEWS, fields);
+      super(fields);
     }
 
     @Override
@@ -242,7 +251,7 @@ public abstract class InfoSchemaTable<S> {
         );
 
     public Columns() {
-      super(TAB_COLUMNS, fields);
+      super(fields);
     }
 
     @Override
@@ -250,4 +259,33 @@ public abstract class InfoSchemaTable<S> {
       return new InfoSchemaRecordGenerator.Columns(optionManager);
     }
   }
+
+  /** Layout for the FILES table. */
+  public static class Files extends InfoSchemaTable<Records.File> {
+
+    private static final List<Field> fields = ImmutableList.of(
+        Field.create(FILES_COL_SCHEMA_NAME, VARCHAR),
+        Field.create(FILES_COL_ROOT_SCHEMA_NAME, VARCHAR),
+        Field.create(FILES_COL_WORKSPACE_NAME, VARCHAR),
+        Field.create(FILES_COL_FILE_NAME, VARCHAR),
+        Field.create(FILES_COL_RELATIVE_PATH, VARCHAR),
+        Field.create(FILES_COL_IS_DIRECTORY, BIT),
+        Field.create(FILES_COL_IS_FILE, BIT),
+        Field.create(FILES_COL_LENGTH, BIGINT),
+        Field.create(FILES_COL_OWNER, VARCHAR),
+        Field.create(FILES_COL_GROUP, VARCHAR),
+        Field.create(FILES_COL_PERMISSION, VARCHAR),
+        Field.create(FILES_COL_MODIFICATION_TIME, VARCHAR)
+    );
+
+    public Files() {
+      super(fields);
+    }
+
+    @Override
+    public InfoSchemaRecordGenerator<Records.File> getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Files(optionManager);
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
index 37d1a6b..961b90d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
@@ -23,6 +23,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Catalogs;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Columns;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Files;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Schemata;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Tables;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Views;
@@ -32,14 +33,13 @@ import org.apache.drill.exec.store.pojo.PojoRecordReader;
  * The set of tables/views in INFORMATION_SCHEMA.
  */
 public enum InfoSchemaTableType {
-  // TODO:  Resolve how to not have two different place defining table names:
-  // NOTE: These identifiers have to match the string values in
-  // InfoSchemaConstants.
+
   CATALOGS(new Catalogs()),
   SCHEMATA(new Schemata()),
   VIEWS(new Views()),
   COLUMNS(new Columns()),
-  TABLES(new Tables());
+  TABLES(new Tables()),
+  FILES(new Files());
 
   private final InfoSchemaTable<?> tableDef;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
index c684e7a..6632296 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
@@ -24,10 +24,17 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
 import com.google.common.base.MoreObjects;
 
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
 public class Records {
 
   /** Pojo object for a record in INFORMATION_SCHEMA.TABLES */
@@ -543,4 +550,39 @@ public class Records {
       this.IS_MUTABLE = isMutable ? "YES" : "NO";
     }
   }
-}
+
+  /** Pojo object for a record in INFORMATION_SCHEMA.FILES */
+  public static class File {
+
+    public final String SCHEMA_NAME;
+    public final String ROOT_SCHEMA_NAME;
+    public final String WORKSPACE_NAME;
+    public final String FILE_NAME;
+    public final String RELATIVE_PATH;
+    public final boolean IS_DIRECTORY;
+    public final boolean IS_FILE;
+    public final long LENGTH;
+    public final String OWNER;
+    public final String GROUP;
+    public final String PERMISSION;
+    public final String MODIFICATION_TIME;
+
+    public File(String schemaName, WorkspaceSchemaFactory.WorkspaceSchema wsSchema, FileStatus fileStatus) {
+      this.SCHEMA_NAME = schemaName;
+      this.ROOT_SCHEMA_NAME = wsSchema.getSchemaPath().get(0);
+      this.WORKSPACE_NAME = wsSchema.getName();
+      this.FILE_NAME = fileStatus.getPath().getName();
+      this.RELATIVE_PATH = Path.getPathWithoutSchemeAndAuthority(new Path(wsSchema.getDefaultLocation())).toUri()
+        .relativize(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toUri()).getPath();
+      this.IS_DIRECTORY = fileStatus.isDirectory();
+      this.IS_FILE = fileStatus.isFile();
+      this.LENGTH = fileStatus.getLen();
+      this.OWNER = fileStatus.getOwner();
+      this.GROUP = fileStatus.getGroup();
+      this.PERMISSION = fileStatus.getPermission().toString();
+      this.MODIFICATION_TIME = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(ZoneOffset.UTC)
+          .format(Instant.ofEpochMilli(fileStatus.getModificationTime()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 16a285b..06c6978 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -605,4 +605,5 @@ drill.exec.options: {
     store.kafka.poll.timeout: 200,
     web.logs.max_lines: 10000,
     window.enable: true,
+    storage.list_files_recursively: false
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index 052b761..6313d74 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -129,8 +129,6 @@ public class TestDropTable extends PlanTestBase {
     final String nestedJsonTable = tableName + Path.SEPARATOR + "json_table";
     test(CREATE_SIMPLE_TABLE, BACK_TICK + nestedJsonTable + BACK_TICK);
 
-    test("show files from " + tableName);
-
     boolean dropFailed = false;
     // this should fail, because the directory contains non-homogenous files
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 2b112e2..3b25ddb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class BaseTestImpersonation extends PlanTestBase {
   protected static final String MINIDFS_STORAGE_PLUGIN_NAME = "miniDfsPlugin";
@@ -137,9 +138,19 @@ public class BaseTestImpersonation extends PlanTestBase {
 
   protected static void createAndAddWorkspace(String name, String path, short permissions, String owner,
       String group, final Map<String, WorkspaceConfig> workspaces) throws Exception {
-    final Path dirPath = new Path(path);
-    FileSystem.mkdirs(fs, dirPath, new FsPermission(permissions));
+
+    FsPermission permission = new FsPermission(permissions);
+
+    Path dirPath = new Path(path);
+    assertTrue(FileSystem.mkdirs(fs, dirPath, permission));
     fs.setOwner(dirPath, owner, group);
+
+    // create sample file in the workspace to check show files command
+    Path sampleFile = new Path(dirPath, String.format("sample_%s.txt", name));
+    assertTrue(fs.createNewFile(sampleFile));
+    fs.setPermission(sampleFile, permission);
+    fs.setOwner(sampleFile, owner, group);
+
     final WorkspaceConfig ws = new WorkspaceConfig(path, true, "parquet", false);
     workspaces.put(name, ws);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index 2eb55db..08c09d1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -23,6 +23,7 @@ import org.apache.drill.categories.SecurityTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.categories.SlowTest;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +39,7 @@ import org.junit.experimental.categories.Category;
 import java.util.Map;
 
 import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -154,37 +156,34 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
     updateClient(user1);
 
     // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-    test("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME);
+    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
 
-    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for
-    // "user1"
-    test("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME);
+    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for "user1"
+    count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
     updateClient(user2);
-    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part
-    // of the "group0"
-    test("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME);
+    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
+    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
-    UserRemoteException ex = null;
-
     updateClient(user2);
+
     try {
+      setSessionOption(ExecConstants.LIST_FILES_RECURSIVELY, true);
       // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-      test("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME);
-    } catch(UserRemoteException e) {
-      ex = e;
+      int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+      assertEquals("Counts should match", 0, count);
+    } finally {
+      resetSessionOption(ExecConstants.LIST_FILES_RECURSIVELY);
     }
-
-    assertNotNull("UserRemoteException is expected", ex);
-    assertThat(ex.getMessage(),
-        containsString("Permission denied: user=drillTestUser2, " +
-            "access=READ_EXECUTE, inode=\"/drillTestGrp1_700\":drillTestUser1:drillTestGrp1:drwx------"));
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 3932d7e..6e7d054 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -69,6 +69,7 @@ public class TestInfoSchema extends BaseTestQuery {
     test("select * from INFORMATION_SCHEMA.VIEWS");
     test("select * from INFORMATION_SCHEMA.`TABLES`");
     test("select * from INFORMATION_SCHEMA.COLUMNS");
+    test("select * from INFORMATION_SCHEMA.`FILES`");
   }
 
   @Test
@@ -89,7 +90,8 @@ public class TestInfoSchema extends BaseTestQuery {
             new String[] { "INFORMATION_SCHEMA", "COLUMNS" },
             new String[] { "INFORMATION_SCHEMA", "TABLES" },
             new String[] { "INFORMATION_SCHEMA", "CATALOGS" },
-            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" }
+            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" },
+            new String[] { "INFORMATION_SCHEMA", "FILES" }
         );
 
     final TestBuilder t1 = testBuilder()
@@ -364,18 +366,6 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
-  public void showFiles() throws Exception {
-    test("show files from dfs.`%s`", TEST_SUB_DIR);
-    test("show files from `dfs.default`.`%s`", TEST_SUB_DIR);
-  }
-
-  @Test
-  public void showFilesWithDefaultSchema() throws Exception{
-    test("USE dfs.`default`");
-    test("SHOW FILES FROM `%s`", TEST_SUB_DIR);
-  }
-
-  @Test
   public void describeSchemaSyntax() throws Exception {
     test("describe schema dfs");
     test("describe schema dfs.`default`");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java
new file mode 100644
index 0000000..f8ea9a1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(SqlTest.class)
+public class TestFilesTable extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+
+    // create one workspace named files
+    File filesWorkspace = cluster.makeDataDir("files", null, null);
+
+    // add data to the workspace: one file and folder with one file
+    assertTrue(new File(filesWorkspace, "file1.txt").createNewFile());
+    File folder = new File(filesWorkspace, "folder");
+    assertTrue(folder.mkdir());
+    assertTrue(new File(folder, "file2.txt").createNewFile());
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testSelectWithoutRecursion() throws Exception {
+    client.testBuilder()
+        .sqlQuery("select schema_name, root_schema_name, workspace_name, file_name, relative_path, is_directory, is_file from INFORMATION_SCHEMA.`FILES`")
+        .unOrdered()
+        .baselineColumns("schema_name", "root_schema_name", "workspace_name", "file_name", "relative_path", "is_directory", "is_file")
+        .baselineValues("dfs.files", "dfs", "files", "file1.txt", "file1.txt", false, true)
+        .baselineValues("dfs.files", "dfs", "files", "folder", "folder", true, false)
+        .go();
+  }
+
+  @Test
+  public void testSelectWithRecursion() throws Exception {
+    try {
+      client.alterSession(ExecConstants.LIST_FILES_RECURSIVELY, true);
+      client.testBuilder()
+          .sqlQuery("select schema_name, root_schema_name, workspace_name, file_name, relative_path, is_directory, is_file from INFORMATION_SCHEMA.`FILES`")
+          .unOrdered()
+          .baselineColumns("schema_name", "root_schema_name", "workspace_name", "file_name", "relative_path", "is_directory", "is_file")
+          .baselineValues("dfs.files", "dfs", "files", "file1.txt", "file1.txt", false, true)
+          .baselineValues("dfs.files", "dfs", "files", "folder", "folder", true, false)
+          .baselineValues("dfs.files", "dfs", "files", "file2.txt", "folder/file2.txt", false, true)
+          .go();
+    } finally {
+      client.resetSession(ExecConstants.LIST_FILES_RECURSIVELY);
+    }
+
+  }
+
+  @Test
+  public void testShowFilesWithInCondition() throws Exception {
+    client.testBuilder()
+        .sqlQuery("show files in dfs.`files`")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesForSpecificFolderSuccess() throws Exception {
+    try {
+      client.alterSession(ExecConstants.LIST_FILES_RECURSIVELY, true);
+      client.testBuilder()
+          .sqlQuery("show files in dfs.`files`.folder")
+          .unOrdered()
+          .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files' and relative_path like 'folder/%'")
+          .go();
+    } finally {
+      client.resetSession(ExecConstants.LIST_FILES_RECURSIVELY);
+    }
+  }
+
+  @Test
+  public void testShowFilesForSpecificFolderFailure() throws Exception {
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(String.format("To SHOW FILES in specific directory, enable option %s", ExecConstants.LIST_FILES_RECURSIVELY));
+    queryBuilder().sql("show files in dfs.`files`.folder").run();
+  }
+
+  @Test
+  public void testShowFilesWithUseClause() throws Exception {
+    queryBuilder().sql("use dfs.`files`").run();
+    client.testBuilder()
+        .sqlQuery("show files")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesWithPartialUseClause() throws Exception {
+    queryBuilder().sql("use dfs").run();
+    client.testBuilder()
+        .sqlQuery("show files in `files`")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesForDefaultSchema() throws Exception {
+    queryBuilder().sql("use dfs").run();
+    client.testBuilder()
+        .sqlQuery("show files")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.default'")
+        .go();
+  }
+
+  @Test
+  public void testFilterPushDown_None() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where file_name = 'file1.txt'").explainText();
+    assertTrue(plan.contains("filter=null"));
+    assertTrue(plan.contains("Filter(condition="));
+  }
+
+  @Test
+  public void testFilterPushDown_Partial() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files' and file_name = 'file1.txt'").explainText();
+    assertTrue(plan.contains("filter=booleanand(equal(Field=SCHEMA_NAME,Literal=dfs.files))"));
+    assertTrue(plan.contains("Filter(condition="));
+  }
+
+  @Test
+  public void testFilterPushDown_Full() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'").explainText();
+    assertTrue(plan.contains("filter=equal(Field=SCHEMA_NAME,Literal=dfs.files)"));
+    assertFalse(plan.contains("Filter(condition="));
+  }
+
+}
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index 4574425..8d34ebc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.work.metadata;
 
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -61,7 +61,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     CatalogMetadata c = catalogs.get(0);
     assertEquals(IS_CATALOG_NAME, c.getCatalogName());
-    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_DESCRIPTION, c.getDescription());
     assertEquals(IS_CATALOG_CONNECT, c.getConnect());
   }
 
@@ -78,7 +78,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     CatalogMetadata c = catalogs.get(0);
     assertEquals(IS_CATALOG_NAME, c.getCatalogName());
-    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_DESCRIPTION, c.getDescription());
     assertEquals(IS_CATALOG_CONNECT, c.getConnect());
   }
 
@@ -149,13 +149,14 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(17, tables.size());
+    assertEquals(18, tables.size());
 
     verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
     verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
     verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
     verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
     verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -186,13 +187,14 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(17, tables.size());
+    assertEquals(18, tables.size());
 
     verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
     verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
     verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
     verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
     verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -248,7 +250,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(118, columns.size());
+    assertEquals(130, columns.size());
     // too many records to verify the output.
   }