You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2019/04/29 18:18:48 UTC

[impala] branch master updated: IMPALA-8454 (part 2): Initial support for recursive file listing within a partition

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ced916  IMPALA-8454 (part 2): Initial support for recursive file listing within a partition
5ced916 is described below

commit 5ced9160bd65e5a72d739a7a5c548add2dbc4b84
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Wed Apr 10 23:29:53 2019 -0700

    IMPALA-8454 (part 2): Initial support for recursive file listing within a partition
    
    This adds support to FileMetadataLoader to recursively list a directory
    and create file descriptors. The changes are as follows:
    
    * FileMetadataLoader can now take a 'recursive' argument to trigger the
      new behavior. All the non-test code paths still use non-recursive
      (i.e. this new feature isn't exposed for real tables as of yet).
    
    * FileSystemUtil has some functionality for recursive directory listing.
      There are a few notes there around unexpected optimizations for S3 vs
      HDFS.
    
    * Renamed the 'file_name' field to 'relative_path' for FileDescriptor
      and HDFS splits, since now the file descriptors may be more than a
      single path component.
    
    The new functionality is just unit tested at the moment. Later, this
    functionality will be tied into the actual table code paths to solve
    issues with Hive interop, along with end-to-end tests.
    
    Change-Id: I9b151d7abb8443c0d9de0a0d82a9f13e07ad5109
    Reviewed-on: http://gerrit.cloudera.org:8080/12991
    Tested-by: Todd Lipcon <to...@apache.org>
    Reviewed-by: Todd Lipcon <to...@apache.org>
---
 be/src/exec/hdfs-scan-node-base.cc                 |  2 +-
 be/src/scheduling/scheduler-test-util.cc           |  6 +-
 be/src/scheduling/scheduler.cc                     |  6 +-
 common/fbs/CatalogObjects.fbs                      |  9 +-
 common/thrift/PlanNodes.thrift                     |  4 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  |  2 +-
 .../apache/impala/catalog/FileMetadataLoader.java  | 46 +++++++----
 .../org/apache/impala/catalog/HdfsPartition.java   | 44 ++++++----
 .../java/org/apache/impala/catalog/HdfsTable.java  |  6 +-
 .../impala/catalog/local/DirectMetaProvider.java   |  1 +
 .../org/apache/impala/common/FileSystemUtil.java   | 96 +++++++++++++++++++++-
 .../org/apache/impala/planner/HdfsScanNode.java    |  6 +-
 .../impala/catalog/FileMetadataLoaderTest.java     | 88 ++++++++++++++++++++
 .../apache/impala/catalog/HdfsPartitionTest.java   |  4 +-
 .../org/apache/impala/planner/ExplainTest.java     |  2 +-
 .../org/apache/impala/planner/PlannerTestBase.java |  2 +-
 .../apache/impala/testutil/BlockIdGenerator.java   |  2 +-
 17 files changed, 267 insertions(+), 59 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 966bb58..15aa4f9 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -207,7 +207,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     }
 
     filesystem::path file_path(partition_desc->location());
-    file_path.append(split.file_name, filesystem::path::codecvt());
+    file_path.append(split.relative_path, filesystem::path::codecvt());
     const string& native_file_path = file_path.native();
 
     auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index b1534da..9916cbc 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -273,7 +273,7 @@ void Plan::BuildScanRange(const TableName& table_name, const Block& block, int b
   // 'length' is the only member considered by the scheduler.
   file_split.length = block.length;
   // Encoding the table name and block index in the file helps debugging.
-  file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
+  file_split.relative_path = table_name + "_block_" + std::to_string(block_idx);
   file_split.offset = 0;
   file_split.partition_id = 0;
   // For now, we model each file by a single block.
@@ -289,9 +289,9 @@ void Plan::BuildScanRangeSpec(const TableName& table_name,
   THdfsFileDesc thrift_file;
 
   flatbuffers::FlatBufferBuilder fb_builder;
-  auto file_name =
+  auto rel_path =
       fb_builder.CreateString(table_name + "_spec_" + std::to_string(spec_idx));
-  auto fb_file_desc = CreateFbFileDesc(fb_builder, file_name, spec.length);
+  auto fb_file_desc = CreateFbFileDesc(fb_builder, rel_path, spec.length);
   fb_builder.Finish(fb_file_desc);
 
   string buffer(
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 79074ba..18a5eb6 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -256,7 +256,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
       RETURN_IF_ERROR(FromFbCompression(fb_desc->compression(), &compression));
       hdfs_scan_range.__set_file_compression(compression);
       hdfs_scan_range.__set_file_length(fb_desc->length());
-      hdfs_scan_range.__set_file_name(fb_desc->file_name()->str());
+      hdfs_scan_range.__set_relative_path(fb_desc->relative_path()->str());
       hdfs_scan_range.__set_length(scan_range_length);
       hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
       hdfs_scan_range.__set_offset(scan_range_offset);
@@ -921,8 +921,8 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
   // Generate multiple hashes of the file split by using the hash as a seed to a PRNG.
   // Note: This hashes both the filename and the offset to allow very large files
   // to be spread across more executors.
-  uint32_t hash = HashUtil::Hash(hdfs_file_split->file_name.data(),
-      hdfs_file_split->file_name.length(), 0);
+  uint32_t hash = HashUtil::Hash(hdfs_file_split->relative_path.data(),
+      hdfs_file_split->relative_path.length(), 0);
   hash = HashUtil::Hash(&hdfs_file_split->offset, sizeof(hdfs_file_split->offset), hash);
   pcg32 prng(hash);
   // To avoid any problem scenarios, limit the total number of iterations
diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
index d320dfa..3cf20d2 100644
--- a/common/fbs/CatalogObjects.fbs
+++ b/common/fbs/CatalogObjects.fbs
@@ -55,10 +55,11 @@ table FbFileBlock {
 }
 
 table FbFileDesc {
-  // The name of the file (not the full path). The parent path is assumed to be the
-  // 'location' of the Partition this file resides within.
-  // TODO: Investigate the use of prefix-based compression for file names.
-  file_name: string (id: 0);
+  // The path of the file relative to the 'location' of the Partition this file
+  // resides within.
+  // TODO: Investigate better compression of filenames which contain common
+  // components.
+  relative_path: string (id: 0);
 
   // The total length of the file, in bytes.
   length: long (id: 1);
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 9ae9d88..f188c72 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -167,9 +167,9 @@ struct TRuntimeFilterDesc {
 
 // Specification of subsection of a single hdfs file.
 struct THdfsFileSplit {
-  // File name (not the full path).  The path is assumed to be the
+  // File name (not the full path).  The path is assumed to be relative to the
   // 'location' of the THdfsPartition referenced by partition_id.
-  1: required string file_name
+  1: required string relative_path
 
   // starting offset
   2: required i64 offset
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index dd1699c..5fb50cc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -264,7 +264,7 @@ public interface FeFsTable extends FeTable {
         Collections.sort(orderedFds);
         for (FileDescriptor fd: orderedFds) {
           TResultRowBuilder rowBuilder = new TResultRowBuilder();
-          rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
+          rowBuilder.add(p.getLocation() + "/" + fd.getRelativePath());
           rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
           rowBuilder.add(p.getPartitionName());
           result.addToRows(rowBuilder.get());
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index d0f71b5..5be2341 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -17,6 +17,7 @@
 package org.apache.impala.catalog;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -49,7 +50,8 @@ public class FileMetadataLoader {
   private static final Configuration CONF = new Configuration();
 
   private final Path partDir_;
-  private final ImmutableMap<String, FileDescriptor> oldFdsByName_;
+  private final boolean recursive_;
+  private final ImmutableMap<String, FileDescriptor> oldFdsByRelPath_;
   private final ListMap<TNetworkAddress> hostIndex_;
 
   private boolean forceRefreshLocations = false;
@@ -59,15 +61,17 @@ public class FileMetadataLoader {
 
   /**
    * @param partDir the dir for which to fetch file metadata
+   * @param recursive whether to recursively list files
    * @param oldFds any pre-existing file descriptors loaded for this table, used
    *   to optimize refresh if available.
    * @param hostIndex the host index with which to associate the file descriptors
    */
-  public FileMetadataLoader(Path partDir, List<FileDescriptor> oldFds,
+  public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds,
       ListMap<TNetworkAddress> hostIndex) {
     partDir_ = Preconditions.checkNotNull(partDir);
+    recursive_ = recursive;
     hostIndex_ = Preconditions.checkNotNull(hostIndex);
-    oldFdsByName_ = Maps.uniqueIndex(oldFds, FileDescriptor::getFileName);
+    oldFdsByRelPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getRelativePath);
   }
 
   /**
@@ -121,19 +125,19 @@ public class FileMetadataLoader {
     // assume that most _can_ be reused, in which case it's faster to _not_ prefetch
     // the locations.
     boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) &&
-        (oldFdsByName_.isEmpty() || forceRefreshLocations);
+        (oldFdsByRelPath_.isEmpty() || forceRefreshLocations);
 
     String msg = String.format("%s file metadata%s from path %s",
-          oldFdsByName_.isEmpty() ? "Loading" : "Refreshing",
+          oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
           listWithLocations ? " with eager location-fetching" : "",
           partDir_);
     LOG.trace(msg);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
       RemoteIterator<? extends FileStatus> fileStatuses;
       if (listWithLocations) {
-        fileStatuses = FileSystemUtil.listFiles(fs, partDir_, /*recursive=*/false);
+        fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_);
       } else {
-        fileStatuses = FileSystemUtil.listStatus(fs, partDir_);
+        fileStatuses = FileSystemUtil.listStatus(fs, partDir_, recursive_);
 
         // TODO(todd): we could look at the result of listing without locations, and if
         // we see that a substantial number of the files have changed, it may be better
@@ -149,14 +153,11 @@ public class FileMetadataLoader {
           ++loadStats_.hiddenFiles;
           continue;
         }
-        // TODO(todd): this logic will have to change when we support recursive partition
-        // listing -- we need to index the old FDs by their relative path to the partition
-        // directory, not just the file name (last path component)
-        String fileName = fileStatus.getPath().getName().toString();
-        FileDescriptor fd = oldFdsByName_.get(fileName);
+        String relPath = relativizePath(fileStatus.getPath());
+        FileDescriptor fd = oldFdsByRelPath_.get(relPath);
         if (listWithLocations || forceRefreshLocations ||
             hasFileChanged(fd, fileStatus)) {
-          fd = createFd(fs, fileStatus, numUnknownDiskIds);
+          fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
           ++loadStats_.loadedFiles;
         } else {
           ++loadStats_.skippedFiles;
@@ -171,15 +172,28 @@ public class FileMetadataLoader {
   }
 
   /**
+   * Return the path of 'path' relative to the partition dir being listed. This may
+   * differ from simply the file name in the case of recursive listings.
+   */
+  private String relativizePath(Path path) {
+    URI relUri = partDir_.toUri().relativize(path.toUri());
+    if (relUri.isAbsolute() || relUri.getPath().startsWith("/")) {
+      throw new RuntimeException("FileSystem returned an unexpected path " +
+          path + " for a file within " + partDir_);
+    }
+    return relUri.getPath();
+  }
+
+  /**
    * Create a FileDescriptor for the given FileStatus. If the FS supports block locations,
    * and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) this uses
    * the already-loaded information; otherwise, this may have to remotely look up the
    * locations.
    */
   private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
-      Reference<Long> numUnknownDiskIds) throws IOException {
+      String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
     if (!FileSystemUtil.supportsStorageIds(fs)) {
-      return FileDescriptor.createWithNoBlocks(fileStatus);
+      return FileDescriptor.createWithNoBlocks(fileStatus, relPath);
     }
     BlockLocation[] locations;
     if (fileStatus instanceof LocatedFileStatus) {
@@ -187,7 +201,7 @@ public class FileMetadataLoader {
     } else {
       locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
     }
-    return FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+    return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
         HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 8b6f629..b559d44 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -32,7 +32,6 @@ import javax.annotation.Nonnull;
 
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
@@ -135,12 +134,23 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
      * file resides and 'hostIndex' stores the network addresses of the hosts that store
      * blocks of the parent HdfsTable. 'isEc' indicates whether the file is erasure-coded.
      * Populates 'numUnknownDiskIds' with the number of unknown disk ids.
+     *
+     *
      */
-    public static FileDescriptor create(FileStatus fileStatus,
-        BlockLocation[] blockLocations, FileSystem fileSystem,
-        ListMap<TNetworkAddress> hostIndex, boolean isEc,
+    /**
+     * Creates a FileDescriptor with block locations.
+     *
+     * @param fileStatus the status returned from file listing
+     * @param relPath the path of the file relative to the partition directory
+     * @param blockLocations the block locations for the file
+     * @param hostIndex the host index to use for encoding the hosts
+     * @param isEc true if the file is known to be erasure-coded
+     * @param numUnknownDiskIds reference which will be set to the number of blocks
+     *                          for which no disk ID could be determined
+     */
+    public static FileDescriptor create(FileStatus fileStatus, String relPath,
+        BlockLocation[] blockLocations, ListMap<TNetworkAddress> hostIndex, boolean isEc,
         Reference<Long> numUnknownDiskIds) throws IOException {
-      Preconditions.checkState(FileSystemUtil.supportsStorageIds(fileSystem));
       FlatBufferBuilder fbb = new FlatBufferBuilder(1);
       int[] fbFileBlockOffsets = new int[blockLocations.length];
       int blockIdx = 0;
@@ -154,17 +164,18 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
               FileBlock.createFbFileBlock(fbb, loc, hostIndex, numUnknownDiskIds);
         }
       }
-      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, fbFileBlockOffsets,
-          isEc));
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, relPath,
+          fbFileBlockOffsets, isEc));
     }
 
     /**
      * Creates the file descriptor of a file represented by 'fileStatus' that
      * resides in a filesystem that doesn't support the BlockLocation API (e.g. S3).
      */
-    public static FileDescriptor createWithNoBlocks(FileStatus fileStatus) {
+    public static FileDescriptor createWithNoBlocks(FileStatus fileStatus,
+        String relPath) {
       FlatBufferBuilder fbb = new FlatBufferBuilder(1);
-      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, null, false));
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, relPath, null, false));
     }
 
     /**
@@ -174,17 +185,16 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
      * in the underlying buffer. Can be null if there are no blocks.
      */
     private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
-        FileStatus fileStatus, int[] fbFileBlockOffets, boolean isEc) {
-      // TODO(todd): need to use path relative to the partition dir, not the
-      // filename here.
-      int fileNameOffset = fbb.createString(fileStatus.getPath().getName());
+        FileStatus fileStatus, String relPath, int[] fbFileBlockOffets, boolean isEc) {
+      int relPathOffset = fbb.createString(relPath);
       // A negative block vector offset is used when no block offsets are specified.
       int blockVectorOffset = -1;
       if (fbFileBlockOffets != null) {
         blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, fbFileBlockOffets);
       }
       FbFileDesc.startFbFileDesc(fbb);
-      FbFileDesc.addFileName(fbb, fileNameOffset);
+      // TODO(todd) rename to RelativePathin the FBS
+      FbFileDesc.addRelativePath(fbb, relPathOffset);
       FbFileDesc.addLength(fbb, fileStatus.getLen());
       FbFileDesc.addLastModificationTime(fbb, fileStatus.getModificationTime());
       FbFileDesc.addIsEc(fbb, isEc);
@@ -200,7 +210,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
     }
 
-    public String getFileName() { return fbFileDescriptor_.fileName(); }
+    public String getRelativePath() { return fbFileDescriptor_.relativePath(); }
     public long getFileLength() { return fbFileDescriptor_.length(); }
 
     /** Compute the total length of files in fileDescs */
@@ -239,7 +249,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
         blocks.add(FileBlock.debugString(getFbFileBlock(i)));
       }
       return Objects.toStringHelper(this)
-          .add("FileName", getFileName())
+          .add("RelativePath", getRelativePath())
           .add("Length", getFileLength())
           .add("Compression", getFileCompression())
           .add("ModificationTime", getModificationTime())
@@ -248,7 +258,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
     @Override
     public int compareTo(FileDescriptor otherFd) {
-      return getFileName().compareTo(otherFd.getFileName());
+      return getRelativePath().compareTo(otherFd.getRelativePath());
     }
 
     /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index de190d9..b1a3b7a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -560,7 +560,8 @@ public class HdfsTable extends Table implements FeFsTable {
     Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
     for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
-      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), oldFds, hostIndex_);
+      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), /*recursive=*/false,
+          oldFds, hostIndex_);
       // If there is a cached partition mapped to this path, we recompute the block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
@@ -1645,7 +1646,8 @@ public class HdfsTable extends Table implements FeFsTable {
       return;
     }
 
-    RemoteIterator<FileStatus> statuses = FileSystemUtil.listStatus(fs, path);
+    RemoteIterator<? extends FileStatus> statuses = FileSystemUtil.listStatus(fs, path,
+        /*recursive=*/false);
     if (statuses == null) return;
     while (statuses.hasNext()) {
       FileStatus status = statuses.next();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index bee5ada..6fbda7c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -286,6 +286,7 @@ class DirectMetaProvider implements MetaProvider {
       String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex) {
     Path partDir = new Path(msPartition.getSd().getLocation());
     FileMetadataLoader fml = new FileMetadataLoader(partDir,
+        /* recursive= */false,
         /* oldFds= */Collections.emptyList(),
         hostIndex);
 
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index c326d23..84a2674 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Stack;
 import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
@@ -539,9 +541,33 @@ public class FileSystemUtil {
    * the file does not exist and also saves an RPC as the caller need not do a separate
    * exists check for the path. Returns null if the path does not exist.
    */
-  public static RemoteIterator<FileStatus> listStatus(FileSystem fs, Path p)
-      throws IOException {
+  public static RemoteIterator<? extends FileStatus> listStatus(FileSystem fs, Path p,
+      boolean recursive) throws IOException {
     try {
+      if (recursive) {
+        // The Hadoop FileSystem API doesn't provide a recursive listStatus call that
+        // doesn't also fetch block locations, and fetching block locations is expensive.
+        // Here, our caller specifically doesn't need block locations, so we don't want to
+        // call the expensive 'listFiles' call on HDFS. Instead, we need to "manually"
+        // recursively call FileSystem.listStatusIterator().
+        //
+        // Note that this "manual" recursion is not actually any slower than the recursion
+        // provided by the HDFS 'listFiles(recursive=true)' API, since the HDFS wire
+        // protocol doesn't provide any such recursive support anyway. In other words,
+        // the API that looks like a single recursive call is just as bad as what we're
+        // doing here.
+        //
+        // However, S3 actually implements 'listFiles(recursive=true)' with a faster path
+        // which natively recurses. In that case, it's quite preferable to use 'listFiles'
+        // even though it returns LocatedFileStatus objects with "fake" blocks which we
+        // will ignore.
+        if (isS3AFileSystem(fs)) {
+          return listFiles(fs, p, recursive);
+        }
+
+        return new RecursingIterator(fs, p);
+      }
+
       return fs.listStatusIterator(p);
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
@@ -569,4 +595,70 @@ public class FileSystemUtil {
     FileSystem fs = getFileSystemForPath(p);
     return fs.isDirectory(p);
   }
+
+  /**
+   * Iterator which recursively visits directories on a FileSystem, yielding
+   * files in an unspecified order. Only files are yielded -- not directories.
+   */
+  private static class RecursingIterator implements RemoteIterator<FileStatus> {
+    private final FileSystem fs_;
+    private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
+    private RemoteIterator<FileStatus> curIter_;
+    private FileStatus curFile_;
+
+    private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+      this.fs_ = Preconditions.checkNotNull(fs);
+      curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath));
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      // Pull the next file to be returned into 'curFile'. If we've already got one,
+      // we don't need to do anything (extra calls to hasNext() must not affect
+      // state)
+      while (curFile_ == null) {
+        if (curIter_.hasNext()) {
+          // Consume the next file or directory from the current iterator.
+          handleFileStat(curIter_.next());
+        } else if (!iters_.empty()) {
+          // We ran out of entries in the current one, but we might still have
+          // entries at a higher level of recursion.
+          curIter_ = iters_.pop();
+        } else {
+          // No iterators left to process, so we are entirely done.
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Process the input stat.
+     * If it is a file, return the file stat.
+     * If it is a directory, traverse the directory if recursive is true;
+     * ignore it if recursive is false.
+     * @param fileStatus input status
+     * @throws IOException if any IO error occurs
+     */
+    private void handleFileStat(FileStatus fileStatus) throws IOException {
+      if (fileStatus.isFile()) { // file
+        curFile_ = fileStatus;
+      } else { // directory
+        iters_.push(curIter_);
+        curIter_ = fs_.listStatusIterator(fileStatus.getPath());
+      }
+    }
+
+    @Override
+    public FileStatus next() throws IOException {
+      if (hasNext()) {
+        FileStatus result = curFile_;
+        // Reset back to 'null' so that hasNext() will pull a new entry on the next
+        // call.
+        curFile_ = null;
+        return result;
+      }
+      throw new NoSuchElementException("No more entries");
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b58cb98..ce5c850 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -878,7 +878,7 @@ public class HdfsScanNode extends ScanNode {
             fileDesc.getIsEc()) {
           throw new ImpalaRuntimeException(String.format(
               "Scanning of HDFS erasure-coded file (%s/%s) is not supported",
-              partition.getLocation(), fileDesc.getFileName()));
+              partition.getLocation(), fileDesc.getRelativePath()));
         }
         if (!fsHasBlocks) {
           Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
@@ -945,7 +945,7 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkArgument(maxBlockSize > 0);
     if (fileDesc.getFileLength() <= 0) return;
     boolean splittable = partition.getFileFormat().isSplittable(
-        HdfsCompression.fromFileName(fileDesc.getFileName()));
+        HdfsCompression.fromFileName(fileDesc.getRelativePath()));
     TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
         fileDesc.toThrift(), maxBlockSize, splittable, partition.getId());
     scanRangeSpecs_.addToSplit_specs(splitSpec);
@@ -1010,7 +1010,7 @@ public class HdfsScanNode extends ScanNode {
           currentLength = scanRangeBytesLimit;
         }
         TScanRange scanRange = new TScanRange();
-        scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
+        scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getRelativePath(),
             currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
             fileDesc.getIsEc()));
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
new file mode 100644
index 0000000..d2e974b
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.junit.Test;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+
+
+public class FileMetadataLoaderTest {
+
+  @Test
+  public void testRecursiveLoading() throws IOException {
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
+    FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
+        /* oldFds = */Collections.emptyList(), hostIndex);
+    fml.load();
+    assertEquals(24, fml.getStats().loadedFiles);
+    assertEquals(24, fml.getLoadedFds().size());
+
+    // Test that relative paths are constructed properly.
+    ArrayList<String> relPaths = new ArrayList<>(Collections2.transform(
+        fml.getLoadedFds(), FileDescriptor::getRelativePath));
+    Collections.sort(relPaths);
+    assertEquals("year=2009/month=1/090101.txt", relPaths.get(0));
+    assertEquals("year=2010/month=9/100901.txt", relPaths.get(23));
+
+    // Test that refreshing is properly incremental if no files changed.
+    FileMetadataLoader refreshFml = new FileMetadataLoader(tablePath, /* recursive=*/true,
+        /* oldFds = */fml.getLoadedFds(), hostIndex);
+    refreshFml.load();
+    assertEquals(24, refreshFml.getStats().skippedFiles);
+    assertEquals(0, refreshFml.getStats().loadedFiles);
+    assertEquals(fml.getLoadedFds(), refreshFml.getLoadedFds());
+
+    // Touch a file and make sure that we reload locations for that file.
+    FileSystem fs = tablePath.getFileSystem(new Configuration());
+    FileDescriptor fd = fml.getLoadedFds().get(0);
+    Path filePath = new Path(tablePath, fd.getRelativePath());
+    fs.setTimes(filePath, fd.getModificationTime() + 1, /* atime= */-1);
+
+    refreshFml = new FileMetadataLoader(tablePath, /* recursive=*/true,
+        /* oldFds = */fml.getLoadedFds(), hostIndex);
+    refreshFml.load();
+    assertEquals(1, refreshFml.getStats().loadedFiles);
+  }
+
+  @Test
+  public void testLoadMissingDirectory() throws IOException {
+    for (boolean recursive : ImmutableList.of(false, true)) {
+      ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+      Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/does-not-exist/");
+      FileMetadataLoader fml = new FileMetadataLoader(tablePath, recursive,
+          /* oldFds = */Collections.emptyList(), hostIndex);
+      fml.load();
+      assertEquals(0, fml.getLoadedFds().size());
+    }
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
index 5e518fc..069d39e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
@@ -150,8 +150,8 @@ public class HdfsPartitionTest {
     // Fetch some metadata from a directory in HDFS.
     Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas");
     ListMap<TNetworkAddress> origIndex = new ListMap<>();
-    FileMetadataLoader fml = new FileMetadataLoader(p, Collections.emptyList(),
-        origIndex);
+    FileMetadataLoader fml = new FileMetadataLoader(p, /* recursive= */false,
+        Collections.emptyList(), origIndex);
     fml.load();
     List<FileDescriptor> fileDescriptors = fml.getLoadedFds();
     assertTrue(!fileDescriptors.isEmpty());
diff --git a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
index dcc3bd2..75c2730 100644
--- a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
@@ -156,7 +156,7 @@ public class ExplainTest extends FrontendTestBase {
     List<HdfsPartition.FileDescriptor> mockFilesDescs = new ArrayList<>();
     HdfsPartition.FileDescriptor mockFileDesc = mock(HdfsPartition.FileDescriptor.class);
     when(mockFileDesc.getFileLength()).thenReturn(1L);
-    when(mockFileDesc.getFileName()).thenReturn("");
+    when(mockFileDesc.getRelativePath()).thenReturn("");
     mockFilesDescs.add(mockFileDesc);
 
     when(mockHdfsPartition.getLocationPath())
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 2aa4069..dc420f8 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -290,7 +290,7 @@ public class PlannerTestBase extends FrontendTestBase {
               file_location =
                   table.getPartition_prefixes().get(location.prefix_index) + file_location;
             }
-            Path filePath = new Path(file_location, split.file_name);
+            Path filePath = new Path(file_location, split.relative_path);
             filePath = cleanseFilePath(filePath);
             result.append("HDFS SPLIT " + filePath.toString() + " "
                 + Long.toString(split.offset) + ":" + Long.toString(split.length));
diff --git a/fe/src/test/java/org/apache/impala/testutil/BlockIdGenerator.java b/fe/src/test/java/org/apache/impala/testutil/BlockIdGenerator.java
index ef03d67..6d59e68 100644
--- a/fe/src/test/java/org/apache/impala/testutil/BlockIdGenerator.java
+++ b/fe/src/test/java/org/apache/impala/testutil/BlockIdGenerator.java
@@ -78,7 +78,7 @@ public class BlockIdGenerator {
             for (FeFsPartition partition : parts) {
               List<FileDescriptor> fileDescriptors = partition.getFileDescriptors();
               for (FileDescriptor fd : fileDescriptors) {
-                Path p = new Path(partition.getLocation(), fd.getFileName());
+                Path p = new Path(partition.getLocation(), fd.getRelativePath());
 
                 // Use a deprecated API to get block ids
                 DistributedFileSystem dfs =