You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/15 15:42:41 UTC

[GitHub] arina-ielchiieva closed pull request #1430: DRILL-6680: Expose show files command into INFORMATION_SCHEMA

arina-ielchiieva closed pull request #1430: DRILL-6680: Expose show files command into INFORMATION_SCHEMA
URL: https://github.com/apache/drill/pull/1430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9bec3933a07..8eacded538a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -719,4 +719,6 @@ public static String bootDefaultFor(String name) {
   public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators";
   public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);
 
+  public static final String LIST_FILES_RECURSIVELY = "storage.list_files_recursively";
+  public static final BooleanValidator LIST_FILES_RECURSIVELY_VALIDATOR = new BooleanValidator(LIST_FILES_RECURSIVELY);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index d96f3e14026..32768f8222e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -23,7 +23,6 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_COLUMNS;
 
 import java.util.List;
 
@@ -45,6 +44,7 @@
 import org.apache.drill.exec.planner.sql.SqlConverter;
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.google.common.collect.ImmutableList;
@@ -56,17 +56,17 @@
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.COLUMNS ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     DrillSqlDescribeTable node = unwrap(sqlNode, DrillSqlDescribeTable.class);
 
     try {
       List<SqlNode> selectList =
-          ImmutableList.of((SqlNode) new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
-                                     new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
-                                     new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
+          ImmutableList.of(new SqlIdentifier(COLS_COL_COLUMN_NAME, SqlParserPos.ZERO),
+                           new SqlIdentifier(COLS_COL_DATA_TYPE, SqlParserPos.ZERO),
+                           new SqlIdentifier(COLS_COL_IS_NULLABLE, SqlParserPos.ZERO));
 
       SqlNode fromClause = new SqlIdentifier(
-          ImmutableList.of(IS_SCHEMA_NAME, TAB_COLUMNS), null, SqlParserPos.ZERO, null);
+          ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.COLUMNS.name()), null, SqlParserPos.ZERO, null);
 
       final SqlIdentifier table = node.getTable();
       final SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
deleted file mode 100644
index 307b01dd52c..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.planner.sql.DirectPlan;
-import org.apache.drill.exec.planner.sql.SchemaUtilites;
-import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.FileSystemUtil;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-
-public class ShowFileHandler extends DefaultSqlHandler {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
-
-  public ShowFileHandler(SqlHandlerConfig config) {
-    super(config);
-  }
-
-  @Override
-  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException {
-
-    SqlIdentifier from = ((SqlShowFiles) sqlNode).getDb();
-
-    DrillFileSystem fs;
-    String defaultLocation;
-    String fromDir = "./";
-
-    SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
-    SchemaPlus drillSchema = defaultSchema;
-
-    // Show files can be used without from clause, in which case we display the files in the default schema
-    if (from != null) {
-      // We are not sure if the full from clause is just the schema or includes table name,
-      // first try to see if the full path specified is a schema
-      drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names);
-      if (drillSchema == null) {
-        // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
-        drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
-        fromDir = fromDir + from.names.get((from.names.size() - 1));
-      }
-
-      if (drillSchema == null) {
-        throw UserException.validationError()
-            .message("Invalid FROM/IN clause [%s]", from.toString())
-            .build(logger);
-      }
-    }
-
-    WorkspaceSchema wsSchema;
-    try {
-       wsSchema = (WorkspaceSchema) drillSchema.unwrap(AbstractSchema.class).getDefaultSchema();
-    } catch (ClassCastException e) {
-      throw UserException.validationError()
-          .message("SHOW FILES is supported in workspace type schema only. Schema [%s] is not a workspace schema.",
-              SchemaUtilites.getSchemaPath(drillSchema))
-          .build(logger);
-    }
-
-    // Get the file system object
-    fs = wsSchema.getFS();
-
-    // Get the default path
-    defaultLocation = wsSchema.getDefaultLocation();
-
-    List<ShowFilesCommandResult> rows = new ArrayList<>();
-
-    for (FileStatus fileStatus : FileSystemUtil.listAll(fs, new Path(defaultLocation, fromDir), false)) {
-      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDirectory(),
-                                                                 fileStatus.isFile(), fileStatus.getLen(),
-                                                                 fileStatus.getOwner(), fileStatus.getGroup(),
-                                                                 fileStatus.getPermission().toString(),
-                                                                 fileStatus.getAccessTime(), fileStatus.getModificationTime());
-      rows.add(result);
-    }
-    return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), rows, ShowFilesCommandResult.class);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java
deleted file mode 100644
index 7d163884493..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesCommandResult.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.sql.handlers;
-
-import java.sql.Timestamp;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-public class ShowFilesCommandResult {
-
-  /* Fields that will be returned as columns
-   * for a 'SHOW FILES' command
-   */
-
-  // Name of the file
-  public String name;
-
-  // Is it a directory
-  public boolean isDirectory;
-
-  // Is it a file
-  public boolean isFile;
-
-  // Length of the file
-  public long length;
-
-  // File owner
-  public String owner;
-
-  // File group
-  public String group;
-
-  // File permissions
-  public String permissions;
-
-  // Access Time
-  public Timestamp accessTime;
-
-  // Modification Time
-  public Timestamp modificationTime;
-
-  public ShowFilesCommandResult(String name,
-                                boolean isDirectory,
-                                boolean isFile,
-                                long length,
-                                String owner,
-                                String group,
-                                String permissions,
-                                long accessTime,
-                                long modificationTime) {
-    this.name = name;
-    this.isDirectory = isDirectory;
-    this.isFile = isFile;
-    this.length = length;
-    this.owner = owner;
-    this.group = group;
-    this.permissions = permissions;
-
-    // Get the timestamp in UTC because Drill's internal TIMESTAMP stores time in UTC
-    DateTime at = new DateTime(accessTime).withZoneRetainFields(DateTimeZone.UTC);
-    this.accessTime = new Timestamp(at.getMillis());
-
-    DateTime mt = new DateTime(modificationTime).withZoneRetainFields(DateTimeZone.UTC);
-    this.modificationTime = new Timestamp(mt.getMillis());
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
new file mode 100644
index 00000000000..c9bac32f21e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
+import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
+import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_RELATIVE_PATH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
+
+
+public class ShowFilesHandler extends DefaultSqlHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
+
+  public ShowFilesHandler(SqlHandlerConfig config) {
+    super(config);
+  }
+
+  /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.FILES ... */
+  @Override
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
+
+    List<SqlNode> selectList = Collections.singletonList(SqlIdentifier.star(SqlParserPos.ZERO));
+
+    SqlNode fromClause = new SqlIdentifier(Arrays.asList(IS_SCHEMA_NAME, InfoSchemaTableType.FILES.name()), SqlParserPos.ZERO);
+
+    SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
+    SchemaPlus drillSchema = defaultSchema;
+
+    SqlShowFiles showFiles = unwrap(sqlNode, SqlShowFiles.class);
+    SqlIdentifier from = showFiles.getDb();
+    boolean addRelativePathLikeClause = false;
+
+    // Show files can be used without from clause, in which case we display the files in the default schema
+    if (from != null) {
+      // We are not sure if the full from clause is just the schema or includes table name,
+      // first try to see if the full path specified is a schema
+      drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names);
+      if (drillSchema == null) {
+        // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
+        drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
+        addRelativePathLikeClause = true;
+      }
+
+      if (drillSchema == null) {
+        throw UserException.validationError()
+            .message("Invalid FROM/IN clause [%s]", from.toString())
+            .build(logger);
+      }
+    }
+
+    String fullSchemaName;
+
+    try {
+      WorkspaceSchema wsSchema = (WorkspaceSchema) drillSchema.unwrap(AbstractSchema.class).getDefaultSchema();
+      fullSchemaName = wsSchema.getFullSchemaName();
+    } catch (ClassCastException e) {
+      throw UserException.validationError()
+          .message("SHOW FILES is supported in workspace type schema only. Schema [%s] is not a workspace schema.",
+              SchemaUtilites.getSchemaPath(drillSchema))
+          .build(logger);
+    }
+
+    SqlNode whereClause = DrillParserUtil.createCondition(new SqlIdentifier(FILES_COL_SCHEMA_NAME, SqlParserPos.ZERO),
+        SqlStdOperatorTable.EQUALS, SqlLiteral.createCharString(fullSchemaName, SqlParserPos.ZERO));
+
+    // listing for specific directory: show files in dfs.tmp.specific_directory
+    if (addRelativePathLikeClause) {
+      if (!context.getOptions().getBoolean(ExecConstants.LIST_FILES_RECURSIVELY)) {
+        throw UserException.validationError()
+            .message("To SHOW FILES in specific directory, enable option %s", ExecConstants.LIST_FILES_RECURSIVELY)
+            .build(logger);
+      }
+
+      // like clause: relative_path like 'specific_directory/%'
+      String folderPath = from.names.get(from.names.size() - 1);
+      folderPath = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+      SqlNode likeLiteral = SqlLiteral.createCharString(folderPath + "%", Util.getDefaultCharset().name(), SqlParserPos.ZERO);
+      SqlNode likeClause = DrillParserUtil.createCondition(new SqlIdentifier(FILES_COL_RELATIVE_PATH, SqlParserPos.ZERO),
+          SqlStdOperatorTable.LIKE, likeLiteral);
+
+      whereClause = DrillParserUtil.createCondition(whereClause, SqlStdOperatorTable.AND, likeClause);
+    }
+
+    return new SqlSelect(SqlParserPos.ZERO, null, new SqlNodeList(selectList, SqlParserPos.ZERO), fromClause, whereClause,
+        null, null, null, null, null, null);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
index 39c0f644dfa..ab460ad92f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
@@ -19,14 +19,12 @@
 
 import java.util.List;
 
-import org.apache.calcite.tools.RelConversionException;
-
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_SCHEMATA;
 
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -43,13 +41,13 @@
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.SCHEMATA ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowSchemas node = unwrap(sqlNode, SqlShowSchemas.class);
     List<SqlNode> selectList =
-        ImmutableList.of((SqlNode) new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
+        ImmutableList.of(new SqlIdentifier(SCHS_COL_SCHEMA_NAME, SqlParserPos.ZERO));
 
     SqlNode fromClause = new SqlIdentifier(
-        ImmutableList.of(IS_SCHEMA_NAME, TAB_SCHEMATA), null, SqlParserPos.ZERO, null);
+        ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.SCHEMATA.name()), null, SqlParserPos.ZERO, null);
 
     SqlNode where = null;
     final SqlNode likePattern = node.getLikePattern();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index 58f205bd4a4..e73e829a96d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -20,7 +20,6 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_TABLES;
 
 import java.util.List;
 
@@ -43,6 +42,7 @@
 import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.ischema.InfoSchemaTableType;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.google.common.collect.ImmutableList;
@@ -55,7 +55,7 @@
 
   /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.`TABLES` ... */
   @Override
-  public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSetupException {
+  public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {
     SqlShowTables node = unwrap(sqlNode, SqlShowTables.class);
     List<SqlNode> selectList = Lists.newArrayList();
     SqlNode fromClause;
@@ -65,7 +65,7 @@ public SqlNode rewrite(SqlNode sqlNode) throws RelConversionException, ForemanSe
     selectList.add(new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO));
     selectList.add(new SqlIdentifier(SHRD_COL_TABLE_NAME, SqlParserPos.ZERO));
 
-    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, TAB_TABLES), SqlParserPos.ZERO);
+    fromClause = new SqlIdentifier(ImmutableList.of(IS_SCHEMA_NAME, InfoSchemaTableType.TABLES.name()), SqlParserPos.ZERO);
 
     final SqlIdentifier db = node.getDb();
     String tableSchema;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
index 9b84a1982eb..09a43f5304c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
@@ -21,7 +21,7 @@
 import java.util.List;
 
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
-import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler;
+import org.apache.drill.exec.planner.sql.handlers.ShowFilesHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -61,7 +61,7 @@ public SqlOperator getOperator() {
 
   @Override
   public List<SqlNode> getOperandList() {
-    return Collections.singletonList( (SqlNode) db);
+    return Collections.singletonList(db);
   }
 
   @Override
@@ -75,7 +75,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 
   @Override
   public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
-    return new ShowFileHandler(config);
+    return new ShowFilesHandler(config);
   }
 
   public SqlIdentifier getDb() { return db; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a627821dd37..e3f21e93b7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -239,6 +239,7 @@
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new OptionDefinition(ExecConstants.LIST_FILES_RECURSIVELY_VALIDATOR)
     };
 
     CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index d9f2ff74526..15bdcd9b364 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -17,55 +17,42 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-public final class InfoSchemaConstants {
-  /** Prevents instantiation. */
-  private InfoSchemaConstants() {
-  }
+public interface InfoSchemaConstants {
 
   /** Name of catalog containing information schema. */
-  public static final String IS_CATALOG_NAME = "DRILL";
+  String IS_CATALOG_NAME = "DRILL";
 
   /** Catalog description */
-  public static final String IS_CATALOG_DESCR = "The internal metadata used by Drill";
+  String IS_CATALOG_DESCRIPTION = "The internal metadata used by Drill";
 
   /** Catalog connect string. Currently empty */
-  public static final String IS_CATALOG_CONNECT = "";
+   String IS_CATALOG_CONNECT = "";
 
   /** Name of information schema. */
-  public static final String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
-
-  // TODO:  Resolve how to not have two different place defining table names:
-  // NOTE: These string values have to match the identifiers for SelectedTable's
-  // enumerators.
-  // Information schema's tables' names:
-  public static final String TAB_CATALOGS = "CATALOGS";
-  public static final String TAB_COLUMNS = "COLUMNS";
-  public static final String TAB_SCHEMATA = "SCHEMATA";
-  public static final String TAB_TABLES = "TABLES";
-  public static final String TAB_VIEWS = "VIEWS";
+   String IS_SCHEMA_NAME = "INFORMATION_SCHEMA";
 
   // CATALOGS column names:
-  public static final String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
-  public static final String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
-  public static final String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
+   String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
+   String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
+   String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
 
   // SCHEMATA column names:
-  public static final String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
-  public static final String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
-  public static final String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
-  public static final String SCHS_COL_TYPE = "TYPE";
-  public static final String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
+   String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
+   String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
+   String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
+   String SCHS_COL_TYPE = "TYPE";
+   String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
 
   // Common TABLES / VIEWS / COLUMNS columns names:
-  public static final String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
-  public static final String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
-  public static final String SHRD_COL_TABLE_NAME = "TABLE_NAME";
+   String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
+   String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
+   String SHRD_COL_TABLE_NAME = "TABLE_NAME";
 
   // Remaining TABLES column names:
-  public static final String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
+   String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
 
   // Remaining VIEWS column names:
-  public static final String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
+   String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
 
   // COLUMNS columns, from SQL standard:
   // 1. TABLE_CATALOG
@@ -87,18 +74,31 @@ private InfoSchemaConstants() {
   // 17. CHARACTER_SET_CATALOG ...
 
   // Remaining COLUMNS column names:
-  public static final String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
-  public static final String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
-  public static final String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
-  public static final String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
-  public static final String COLS_COL_DATA_TYPE = "DATA_TYPE";
-  public static final String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
-  public static final String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
-  public static final String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
-  public static final String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
-  public static final String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
-  public static final String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
-  public static final String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
-  public static final String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
+   String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
+   String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
+   String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
+   String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
+   String COLS_COL_DATA_TYPE = "DATA_TYPE";
+   String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
+   String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
+   String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
+   String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
+   String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
+   String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
+   String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
+   String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
 
+  // FILES column names:
+   String FILES_COL_SCHEMA_NAME = SCHS_COL_SCHEMA_NAME;
+   String FILES_COL_ROOT_SCHEMA_NAME = "ROOT_SCHEMA_NAME";
+   String FILES_COL_WORKSPACE_NAME = "WORKSPACE_NAME";
+   String FILES_COL_FILE_NAME = "FILE_NAME";
+   String FILES_COL_RELATIVE_PATH = "RELATIVE_PATH";
+   String FILES_COL_IS_DIRECTORY = "IS_DIRECTORY";
+   String FILES_COL_IS_FILE = "IS_FILE";
+   String FILES_COL_LENGTH = "LENGTH";
+   String FILES_COL_OWNER = "OWNER";
+   String FILES_COL_GROUP = "GROUP";
+   String FILES_COL_PERMISSION = "PERMISSION";
+   String FILES_COL_MODIFICATION_TIME = "MODIFICATION_TIME";
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
index f06ad84155d..40b4b8d20cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Lists;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
@@ -148,7 +150,9 @@ public ExprNode visitCastExpression(CastExpression e, Void value) throws Runtime
           || field.equals(SCHS_COL_SCHEMA_NAME)
           || field.equals(SHRD_COL_TABLE_NAME)
           || field.equals(SHRD_COL_TABLE_SCHEMA)
-          || field.equals(COLS_COL_COLUMN_NAME)) {
+          || field.equals(COLS_COL_COLUMN_NAME)
+          || field.equals(FILES_COL_ROOT_SCHEMA_NAME)
+          || field.equals(FILES_COL_WORKSPACE_NAME)) {
         return new FieldExprNode(field);
       }
     }
@@ -168,7 +172,9 @@ public ExprNode visitSchemaPath(SchemaPath path, Void value) throws RuntimeExcep
         || field.equals(SCHS_COL_SCHEMA_NAME)
         || field.equals(SHRD_COL_TABLE_NAME)
         || field.equals(SHRD_COL_TABLE_SCHEMA)
-        || field.equals(COLS_COL_COLUMN_NAME)) {
+        || field.equals(COLS_COL_COLUMN_NAME)
+        || field.equals(FILES_COL_ROOT_SCHEMA_NAME)
+        || field.equals(FILES_COL_WORKSPACE_NAME)) {
       return new FieldExprNode(field);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index d2c8c6f32f3..d41428bb863 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -20,14 +20,20 @@
 import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +48,7 @@
 import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
@@ -49,6 +56,9 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Generates records for POJO RecordReader by scanning the given schema. At every level (catalog, schema, table, field),
@@ -110,6 +120,9 @@ public boolean visitTable(String schemaName, String tableName, Table table) {
   public void visitField(String schemaName, String tableName, RelDataTypeField field) {
   }
 
+  public void visitFiles(String schemaName, SchemaPlus schema) {
+  }
+
   protected boolean shouldVisitCatalog() {
     if (filter == null) {
       return true;
@@ -189,6 +202,32 @@ protected boolean shouldVisitColumn(String schemaName, String tableName, String
     return filter.evaluate(recordValues) != Result.FALSE;
   }
 
+  protected boolean shouldVisitFiles(String schemaName, SchemaPlus schemaPlus) {
+    if (filter == null) {
+      return true;
+    }
+
+    AbstractSchema schema;
+    try {
+      schema = schemaPlus.unwrap(AbstractSchema.class);
+    } catch (ClassCastException e) {
+      return false;
+    }
+
+    if (!(schema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
+      return false;
+    }
+
+    WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
+
+    Map<String, String> recordValues = new HashMap<>();
+    recordValues.put(FILES_COL_SCHEMA_NAME, schemaName);
+    recordValues.put(FILES_COL_ROOT_SCHEMA_NAME, wsSchema.getSchemaPath().get(0));
+    recordValues.put(FILES_COL_WORKSPACE_NAME, wsSchema.getName());
+
+    return filter.evaluate(recordValues) != Result.FALSE;
+  }
+
   public abstract PojoRecordReader<S> getRecordReader();
 
   public void scanSchema(SchemaPlus root) {
@@ -207,7 +246,7 @@ private void scanSchema(String schemaPath, SchemaPlus schema) {
     // Recursively scan any subschema.
     for (String name: schema.getSubSchemaNames()) {
       scanSchema(schemaPath +
-          (schemaPath == "" ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
+          ("".equals(schemaPath) ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
           name, schema.getSubSchema(name));
     }
 
@@ -215,6 +254,10 @@ private void scanSchema(String schemaPath, SchemaPlus schema) {
     if (shouldVisitSchema(schemaPath, schema) && visitSchema(schemaPath, schema)) {
       visitTables(schemaPath, schema);
     }
+
+    if (shouldVisitFiles(schemaPath, schema)) {
+      visitFiles(schemaPath, schema);
+    }
   }
 
   /**
@@ -256,7 +299,7 @@ public Catalogs(OptionManager optionManager) {
 
     @Override
     public boolean visitCatalog() {
-      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCR, IS_CATALOG_CONNECT));
+      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCRIPTION, IS_CATALOG_CONNECT));
       return false;
     }
   }
@@ -316,7 +359,6 @@ private void visitTableWithType(String schemaName, String tableName, TableType t
           .checkNotNull(type, "Error. Type information for table %s.%s provided is null.", schemaName,
               tableName);
       records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, type.toString()));
-      return;
     }
 
     @Override
@@ -371,4 +413,39 @@ public void visitField(String schemaName, String tableName, RelDataTypeField fie
       records.add(new Records.Column(IS_CATALOG_NAME, schemaName, tableName, field));
     }
   }
+
+  public static class Files extends InfoSchemaRecordGenerator<Records.File> {
+
+    List<Records.File> records = new ArrayList<>();
+
+    public Files(OptionManager optionManager) {
+      super(optionManager);
+    }
+
+    @Override
+    public PojoRecordReader<Records.File> getRecordReader() {
+      return new PojoRecordReader<>(Records.File.class, records);
+    }
+
+    @Override
+    public void visitFiles(String schemaName, SchemaPlus schemaPlus) {
+      try {
+        AbstractSchema schema = schemaPlus.unwrap(AbstractSchema.class);
+        if (schema instanceof WorkspaceSchemaFactory.WorkspaceSchema) {
+          WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
+          String defaultLocation = wsSchema.getDefaultLocation();
+          FileSystem fs = wsSchema.getFS();
+          boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
+          FileSystemUtil.listAll(fs, new Path(defaultLocation), recursive).forEach(
+              fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
+          );
+        }
+      } catch (ClassCastException | UnsupportedOperationException e) {
+        // ignore the exception since either this is not a Drill schema or schema does not support files listing
+      } catch (IOException e) {
+        logger.warn("Failure while trying to list files", e);
+      }
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
index 64b8c421bf5..c6ebfcc5731 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
@@ -33,6 +33,18 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION_RADIX;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_SCALE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_ORDINAL_POSITION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_FILE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_GROUP;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_IS_DIRECTORY;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_IS_FILE;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_LENGTH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_MODIFICATION_TIME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_OWNER;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_PERMISSION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_RELATIVE_PATH;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_IS_MUTABLE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
@@ -41,11 +53,6 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_CATALOG;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_CATALOGS;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_COLUMNS;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_SCHEMATA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_TABLES;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TAB_VIEWS;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.VIEWS_COL_VIEW_DEFINITION;
 
@@ -63,8 +70,7 @@
 import com.google.common.collect.Lists;
 
 /**
- * Base class for tables in INFORMATION_SCHEMA.  Defines the table (fields and
- * types).
+ * Base class for tables in INFORMATION_SCHEMA.  Defines the table (fields and types).
  */
 public abstract class InfoSchemaTable<S> {
 
@@ -80,30 +86,17 @@ public static Field create(String name, MajorType type) {
     }
   }
 
-  public static final MajorType VARCHAR = Types.required(MinorType.VARCHAR);
   public static final MajorType INT = Types.required(MinorType.INT);
+  public static final MajorType BIGINT = Types.required(MinorType.BIGINT);
+  public static final MajorType VARCHAR = Types.required(MinorType.VARCHAR);
+  public static final MajorType BIT = Types.required(MinorType.BIT);
 
-  private final String tableName;
   private final List<Field> fields;
 
-  public InfoSchemaTable(String tableName, List<Field> fields) {
-    this.tableName = tableName;
+  public InfoSchemaTable(List<Field> fields) {
     this.fields = fields;
   }
 
-  static public RelDataType getRelDataType(RelDataTypeFactory typeFactory, MajorType type) {
-    switch (type.getMinorType()) {
-    case INT:
-      return typeFactory.createSqlType(SqlTypeName.INTEGER);
-    case VARCHAR:
-      // Note:  Remember to not default to "VARCHAR(1)":
-      return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
-    default:
-      throw new UnsupportedOperationException(
-          "Only INT and VARCHAR types are supported in INFORMATION_SCHEMA");
-    }
-  }
-
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
 
     // Convert the array of Drill types to an array of Optiq types
@@ -117,10 +110,26 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
     return typeFactory.createStructType(relTypes, fieldNames);
   }
 
+  private RelDataType getRelDataType(RelDataTypeFactory typeFactory, MajorType type) {
+    switch (type.getMinorType()) {
+      case INT:
+        return typeFactory.createSqlType(SqlTypeName.INTEGER);
+      case BIGINT:
+        return typeFactory.createSqlType(SqlTypeName.BIGINT);
+      case VARCHAR:
+        // Note:  Remember to not default to "VARCHAR(1)":
+        return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+      case BIT:
+        return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+      default:
+        throw new UnsupportedOperationException("Only INT, BIGINT, VARCHAR and BOOLEAN types are supported in " + InfoSchemaConstants.IS_SCHEMA_NAME);
+    }
+  }
+
   public abstract InfoSchemaRecordGenerator<S> getRecordGenerator(OptionManager optionManager);
 
   /** Layout for the CATALOGS table. */
-  static public class Catalogs extends InfoSchemaTable<Records.Catalog> {
+  public static class Catalogs extends InfoSchemaTable<Records.Catalog> {
     // NOTE:  Nothing seems to verify that the types here (apparently used
     // by SQL validation) match the types of the fields in Records.Catalogs).
     private static final List<Field> fields = ImmutableList.of(
@@ -128,8 +137,8 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         Field.create(CATS_COL_CATALOG_DESCRIPTION, VARCHAR),
         Field.create(CATS_COL_CATALOG_CONNECT, VARCHAR));
 
-    Catalogs() {
-      super(TAB_CATALOGS, fields);
+    public Catalogs() {
+      super(fields);
     }
 
     @Override
@@ -150,7 +159,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         Field.create(SCHS_COL_IS_MUTABLE, VARCHAR));
 
     public Schemata() {
-      super(TAB_SCHEMATA, fields);
+      super(fields);
     }
 
     @Override
@@ -170,7 +179,7 @@ public Schemata() {
         Field.create(TBLS_COL_TABLE_TYPE, VARCHAR));
 
     public Tables() {
-      super(TAB_TABLES, fields);
+      super(fields);
     }
 
     @Override
@@ -180,7 +189,7 @@ public Tables() {
   }
 
   /** Layout for the VIEWS table. */
-  static public class Views extends InfoSchemaTable<Records.View> {
+  public static class Views extends InfoSchemaTable<Records.View> {
     // NOTE:  Nothing seems to verify that the types here (apparently used
     // by SQL validation) match the types of the fields in Records.Views).
     private static final List<Field> fields = ImmutableList.of(
@@ -190,7 +199,7 @@ public Tables() {
         Field.create(VIEWS_COL_VIEW_DEFINITION, VARCHAR));
 
     public Views() {
-      super(TAB_VIEWS, fields);
+      super(fields);
     }
 
     @Override
@@ -242,7 +251,7 @@ public Views() {
         );
 
     public Columns() {
-      super(TAB_COLUMNS, fields);
+      super(fields);
     }
 
     @Override
@@ -250,4 +259,33 @@ public Columns() {
       return new InfoSchemaRecordGenerator.Columns(optionManager);
     }
   }
+
+  /** Layout for the FILES table. */
+  public static class Files extends InfoSchemaTable<Records.File> {
+
+    private static final List<Field> fields = ImmutableList.of(
+        Field.create(FILES_COL_SCHEMA_NAME, VARCHAR),
+        Field.create(FILES_COL_ROOT_SCHEMA_NAME, VARCHAR),
+        Field.create(FILES_COL_WORKSPACE_NAME, VARCHAR),
+        Field.create(FILES_COL_FILE_NAME, VARCHAR),
+        Field.create(FILES_COL_RELATIVE_PATH, VARCHAR),
+        Field.create(FILES_COL_IS_DIRECTORY, BIT),
+        Field.create(FILES_COL_IS_FILE, BIT),
+        Field.create(FILES_COL_LENGTH, BIGINT),
+        Field.create(FILES_COL_OWNER, VARCHAR),
+        Field.create(FILES_COL_GROUP, VARCHAR),
+        Field.create(FILES_COL_PERMISSION, VARCHAR),
+        Field.create(FILES_COL_MODIFICATION_TIME, VARCHAR)
+    );
+
+    public Files() {
+      super(fields);
+    }
+
+    @Override
+    public InfoSchemaRecordGenerator<Records.File> getRecordGenerator(OptionManager optionManager) {
+      return new InfoSchemaRecordGenerator.Files(optionManager);
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
index 37d1a6b046a..961b90d5509 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
@@ -23,6 +23,7 @@
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Catalogs;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Columns;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Files;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Schemata;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Tables;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Views;
@@ -32,14 +33,13 @@
  * The set of tables/views in INFORMATION_SCHEMA.
  */
 public enum InfoSchemaTableType {
-  // TODO:  Resolve how to not have two different place defining table names:
-  // NOTE: These identifiers have to match the string values in
-  // InfoSchemaConstants.
+
   CATALOGS(new Catalogs()),
   SCHEMATA(new Schemata()),
   VIEWS(new Views()),
   COLUMNS(new Columns()),
-  TABLES(new Tables());
+  TABLES(new Tables()),
+  FILES(new Files());
 
   private final InfoSchemaTable<?> tableDef;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
index c684e7a52fe..663229678da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
@@ -24,10 +24,17 @@
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
 import com.google.common.base.MoreObjects;
 
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
 public class Records {
 
   /** Pojo object for a record in INFORMATION_SCHEMA.TABLES */
@@ -543,4 +550,39 @@ public Schema(String catalog, String name, String owner, String type, boolean is
       this.IS_MUTABLE = isMutable ? "YES" : "NO";
     }
   }
-}
+
+  /** Pojo object for a record in INFORMATION_SCHEMA.FILES */
+  public static class File {
+
+    public final String SCHEMA_NAME;
+    public final String ROOT_SCHEMA_NAME;
+    public final String WORKSPACE_NAME;
+    public final String FILE_NAME;
+    public final String RELATIVE_PATH;
+    public final boolean IS_DIRECTORY;
+    public final boolean IS_FILE;
+    public final long LENGTH;
+    public final String OWNER;
+    public final String GROUP;
+    public final String PERMISSION;
+    public final String MODIFICATION_TIME;
+
+    public File(String schemaName, WorkspaceSchemaFactory.WorkspaceSchema wsSchema, FileStatus fileStatus) {
+      this.SCHEMA_NAME = schemaName;
+      this.ROOT_SCHEMA_NAME = wsSchema.getSchemaPath().get(0);
+      this.WORKSPACE_NAME = wsSchema.getName();
+      this.FILE_NAME = fileStatus.getPath().getName();
+      this.RELATIVE_PATH = Path.getPathWithoutSchemeAndAuthority(new Path(wsSchema.getDefaultLocation())).toUri()
+        .relativize(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toUri()).getPath();
+      this.IS_DIRECTORY = fileStatus.isDirectory();
+      this.IS_FILE = fileStatus.isFile();
+      this.LENGTH = fileStatus.getLen();
+      this.OWNER = fileStatus.getOwner();
+      this.GROUP = fileStatus.getGroup();
+      this.PERMISSION = fileStatus.getPermission().toString();
+      this.MODIFICATION_TIME = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+          .withZone(ZoneOffset.UTC)
+          .format(Instant.ofEpochMilli(fileStatus.getModificationTime()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 16a285beac2..06c6978d199 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -605,4 +605,5 @@ drill.exec.options: {
     store.kafka.poll.timeout: 200,
     web.logs.max_lines: 10000,
     window.enable: true,
+    storage.list_files_recursively: false
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index 052b761f4fb..6313d74a048 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -129,8 +129,6 @@ public void testNonHomogenousDrop() throws Exception {
     final String nestedJsonTable = tableName + Path.SEPARATOR + "json_table";
     test(CREATE_SIMPLE_TABLE, BACK_TICK + nestedJsonTable + BACK_TICK);
 
-    test("show files from " + tableName);
-
     boolean dropFailed = false;
     // this should fail, because the directory contains non-homogenous files
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index 2b112e2cb44..3b25ddbfddf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -41,6 +41,7 @@
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class BaseTestImpersonation extends PlanTestBase {
   protected static final String MINIDFS_STORAGE_PLUGIN_NAME = "miniDfsPlugin";
@@ -137,9 +138,19 @@ protected static void addMiniDfsBasedStorage(final Map<String, WorkspaceConfig>
 
   protected static void createAndAddWorkspace(String name, String path, short permissions, String owner,
       String group, final Map<String, WorkspaceConfig> workspaces) throws Exception {
-    final Path dirPath = new Path(path);
-    FileSystem.mkdirs(fs, dirPath, new FsPermission(permissions));
+
+    FsPermission permission = new FsPermission(permissions);
+
+    Path dirPath = new Path(path);
+    assertTrue(FileSystem.mkdirs(fs, dirPath, permission));
     fs.setOwner(dirPath, owner, group);
+
+    // create sample file in the workspace to check show files command
+    Path sampleFile = new Path(dirPath, String.format("sample_%s.txt", name));
+    assertTrue(fs.createNewFile(sampleFile));
+    fs.setPermission(sampleFile, permission);
+    fs.setOwner(sampleFile, owner, group);
+
     final WorkspaceConfig ws = new WorkspaceConfig(path, true, "parquet", false);
     workspaces.put(name, ws);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index 2eb55dbaaac..08c09d186cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -23,6 +23,7 @@
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.categories.SlowTest;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +39,7 @@
 import java.util.Map;
 
 import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -154,37 +156,34 @@ public void testShowFilesInWSWithUserAndGroupPermissionsForQueryUser() throws Ex
     updateClient(user1);
 
     // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-    test("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME);
+    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
 
-    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for
-    // "user1"
-    test("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME);
+    // Try show tables in schema "drillTestGrp0_750" which is owned by "processUser" and has group permissions for "user1"
+    count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_750", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithOtherPermissionsForQueryUser() throws Exception {
     updateClient(user2);
-    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part
-    // of the "group0"
-    test("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME);
+    // Try show tables in schema "drillTestGrp0_755" which is owned by "processUser" and group0. "user2" is not part of the "group0"
+    int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp0_755", MINIDFS_STORAGE_PLUGIN_NAME));
+    assertTrue(count > 0);
   }
 
   @Test
   public void testShowFilesInWSWithNoPermissionsForQueryUser() throws Exception {
-    UserRemoteException ex = null;
-
     updateClient(user2);
+
     try {
+      setSessionOption(ExecConstants.LIST_FILES_RECURSIVELY, true);
       // Try show tables in schema "drillTestGrp1_700" which is owned by "user1"
-      test("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME);
-    } catch(UserRemoteException e) {
-      ex = e;
+      int count = testSql(String.format("SHOW FILES IN %s.drillTestGrp1_700", MINIDFS_STORAGE_PLUGIN_NAME));
+      assertEquals("Counts should match", 0, count);
+    } finally {
+      resetSessionOption(ExecConstants.LIST_FILES_RECURSIVELY);
     }
-
-    assertNotNull("UserRemoteException is expected", ex);
-    assertThat(ex.getMessage(),
-        containsString("Permission denied: user=drillTestUser2, " +
-            "access=READ_EXECUTE, inode=\"/drillTestGrp1_700\":drillTestUser1:drillTestGrp1:drwx------"));
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 3932d7e6b51..6e7d05406a9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -69,6 +69,7 @@ public void selectFromAllTables() throws Exception{
     test("select * from INFORMATION_SCHEMA.VIEWS");
     test("select * from INFORMATION_SCHEMA.`TABLES`");
     test("select * from INFORMATION_SCHEMA.COLUMNS");
+    test("select * from INFORMATION_SCHEMA.`FILES`");
   }
 
   @Test
@@ -89,7 +90,8 @@ public void showTablesFromDb() throws Exception{
             new String[] { "INFORMATION_SCHEMA", "COLUMNS" },
             new String[] { "INFORMATION_SCHEMA", "TABLES" },
             new String[] { "INFORMATION_SCHEMA", "CATALOGS" },
-            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" }
+            new String[] { "INFORMATION_SCHEMA", "SCHEMATA" },
+            new String[] { "INFORMATION_SCHEMA", "FILES" }
         );
 
     final TestBuilder t1 = testBuilder()
@@ -363,18 +365,6 @@ public void completeSchemaRef1() throws Exception {
     test("SELECT * FROM `cp.default`.`employee.json` limit 2");
   }
 
-  @Test
-  public void showFiles() throws Exception {
-    test("show files from dfs.`%s`", TEST_SUB_DIR);
-    test("show files from `dfs.default`.`%s`", TEST_SUB_DIR);
-  }
-
-  @Test
-  public void showFilesWithDefaultSchema() throws Exception{
-    test("USE dfs.`default`");
-    test("SHOW FILES FROM `%s`", TEST_SUB_DIR);
-  }
-
   @Test
   public void describeSchemaSyntax() throws Exception {
     test("describe schema dfs");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java
new file mode 100644
index 00000000000..f8ea9a13d3e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestFilesTable.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(SqlTest.class)
+public class TestFilesTable extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+
+    // create one workspace named files
+    File filesWorkspace = cluster.makeDataDir("files", null, null);
+
+    // add data to the workspace: one file and folder with one file
+    assertTrue(new File(filesWorkspace, "file1.txt").createNewFile());
+    File folder = new File(filesWorkspace, "folder");
+    assertTrue(folder.mkdir());
+    assertTrue(new File(folder, "file2.txt").createNewFile());
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testSelectWithoutRecursion() throws Exception {
+    client.testBuilder()
+        .sqlQuery("select schema_name, root_schema_name, workspace_name, file_name, relative_path, is_directory, is_file from INFORMATION_SCHEMA.`FILES`")
+        .unOrdered()
+        .baselineColumns("schema_name", "root_schema_name", "workspace_name", "file_name", "relative_path", "is_directory", "is_file")
+        .baselineValues("dfs.files", "dfs", "files", "file1.txt", "file1.txt", false, true)
+        .baselineValues("dfs.files", "dfs", "files", "folder", "folder", true, false)
+        .go();
+  }
+
+  @Test
+  public void testSelectWithRecursion() throws Exception {
+    try {
+      client.alterSession(ExecConstants.LIST_FILES_RECURSIVELY, true);
+      client.testBuilder()
+          .sqlQuery("select schema_name, root_schema_name, workspace_name, file_name, relative_path, is_directory, is_file from INFORMATION_SCHEMA.`FILES`")
+          .unOrdered()
+          .baselineColumns("schema_name", "root_schema_name", "workspace_name", "file_name", "relative_path", "is_directory", "is_file")
+          .baselineValues("dfs.files", "dfs", "files", "file1.txt", "file1.txt", false, true)
+          .baselineValues("dfs.files", "dfs", "files", "folder", "folder", true, false)
+          .baselineValues("dfs.files", "dfs", "files", "file2.txt", "folder/file2.txt", false, true)
+          .go();
+    } finally {
+      client.resetSession(ExecConstants.LIST_FILES_RECURSIVELY);
+    }
+
+  }
+
+  @Test
+  public void testShowFilesWithInCondition() throws Exception {
+    client.testBuilder()
+        .sqlQuery("show files in dfs.`files`")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesForSpecificFolderSuccess() throws Exception {
+    try {
+      client.alterSession(ExecConstants.LIST_FILES_RECURSIVELY, true);
+      client.testBuilder()
+          .sqlQuery("show files in dfs.`files`.folder")
+          .unOrdered()
+          .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files' and relative_path like 'folder/%'")
+          .go();
+    } finally {
+      client.resetSession(ExecConstants.LIST_FILES_RECURSIVELY);
+    }
+  }
+
+  @Test
+  public void testShowFilesForSpecificFolderFailure() throws Exception {
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(String.format("To SHOW FILES in specific directory, enable option %s", ExecConstants.LIST_FILES_RECURSIVELY));
+    queryBuilder().sql("show files in dfs.`files`.folder").run();
+  }
+
+  @Test
+  public void testShowFilesWithUseClause() throws Exception {
+    queryBuilder().sql("use dfs.`files`").run();
+    client.testBuilder()
+        .sqlQuery("show files")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesWithPartialUseClause() throws Exception {
+    queryBuilder().sql("use dfs").run();
+    client.testBuilder()
+        .sqlQuery("show files in `files`")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'")
+        .go();
+  }
+
+  @Test
+  public void testShowFilesForDefaultSchema() throws Exception {
+    queryBuilder().sql("use dfs").run();
+    client.testBuilder()
+        .sqlQuery("show files")
+        .unOrdered()
+        .sqlBaselineQuery("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.default'")
+        .go();
+  }
+
+  @Test
+  public void testFilterPushDown_None() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where file_name = 'file1.txt'").explainText();
+    assertTrue(plan.contains("filter=null"));
+    assertTrue(plan.contains("Filter(condition="));
+  }
+
+  @Test
+  public void testFilterPushDown_Partial() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files' and file_name = 'file1.txt'").explainText();
+    assertTrue(plan.contains("filter=booleanand(equal(Field=SCHEMA_NAME,Literal=dfs.files))"));
+    assertTrue(plan.contains("Filter(condition="));
+  }
+
+  @Test
+  public void testFilterPushDown_Full() throws Exception {
+    String plan = queryBuilder().sql("select * from INFORMATION_SCHEMA.`FILES` where schema_name = 'dfs.files'").explainText();
+    assertTrue(plan.contains("filter=equal(Field=SCHEMA_NAME,Literal=dfs.files)"));
+    assertFalse(plan.contains("Filter(condition="));
+  }
+
+}
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index 45744256c2f..8d34ebc8a9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.work.metadata;
 
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCR;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -61,7 +61,7 @@ public void catalogs() throws Exception {
 
     CatalogMetadata c = catalogs.get(0);
     assertEquals(IS_CATALOG_NAME, c.getCatalogName());
-    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_DESCRIPTION, c.getDescription());
     assertEquals(IS_CATALOG_CONNECT, c.getConnect());
   }
 
@@ -78,7 +78,7 @@ public void catalogsWithFilter() throws Exception {
 
     CatalogMetadata c = catalogs.get(0);
     assertEquals(IS_CATALOG_NAME, c.getCatalogName());
-    assertEquals(IS_CATALOG_DESCR, c.getDescription());
+    assertEquals(IS_CATALOG_DESCRIPTION, c.getDescription());
     assertEquals(IS_CATALOG_CONNECT, c.getConnect());
   }
 
@@ -149,13 +149,14 @@ public void tables() throws Exception {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(17, tables.size());
+    assertEquals(18, tables.size());
 
     verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
     verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
     verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
     verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
     verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -186,13 +187,14 @@ public void tablesWithSystemTableFilter() throws Exception {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(17, tables.size());
+    assertEquals(18, tables.size());
 
     verifyTable("INFORMATION_SCHEMA", "CATALOGS", tables);
     verifyTable("INFORMATION_SCHEMA", "COLUMNS", tables);
     verifyTable("INFORMATION_SCHEMA", "SCHEMATA", tables);
     verifyTable("INFORMATION_SCHEMA", "TABLES", tables);
     verifyTable("INFORMATION_SCHEMA", "VIEWS", tables);
+    verifyTable("INFORMATION_SCHEMA", "FILES", tables);
     verifyTable("sys", "boot", tables);
     verifyTable("sys", "drillbits", tables);
     verifyTable("sys", "memory", tables);
@@ -248,7 +250,7 @@ public void columns() throws Exception {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(118, columns.size());
+    assertEquals(130, columns.size());
     // too many records to verify the output.
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services