You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/11/03 15:21:15 UTC

git commit: HADOOP-10987. Provide an iterator-based listing API for FileSystem. Contributed by Kihwal Lee.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 27715ec63 -> 67f13b58e


HADOOP-10987. Provide an iterator-based listing API for FileSystem. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67f13b58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67f13b58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67f13b58

Branch: refs/heads/trunk
Commit: 67f13b58e4d41879845aa118186d984de2e312ed
Parents: 27715ec
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon Nov 3 08:20:22 2014 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon Nov 3 08:20:22 2014 -0600

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../java/org/apache/hadoop/fs/FileSystem.java   |  30 ++++
 .../org/apache/hadoop/fs/FilterFileSystem.java  |   7 +
 .../apache/hadoop/fs/FileSystemTestWrapper.java |  28 +--
 .../org/apache/hadoop/fs/TestHarFileSystem.java |   1 +
 .../hadoop/hdfs/DistributedFileSystem.java      | 175 ++++++++++++++-----
 .../org/apache/hadoop/hdfs/TestFileStatus.java  |  27 ++-
 7 files changed, 194 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 689cb45..93e58f6 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -351,6 +351,8 @@ Release 2.7.0 - UNRELEASED
 
   NEW FEATURES
 
+    HADOOP-10987. Provide an iterator-based listing API for FileSystem (kihwal)
+
   IMPROVEMENTS
 
     HADOOP-11156. DelegateToFileSystem should implement

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 1d2270b..9edc54b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1701,6 +1701,36 @@ public abstract class FileSystem extends Configured implements Closeable {
   }
 
   /**
+   * Returns a remote iterator so that followup calls are made on demand
+   * while consuming the entries. Each file system implementation should
+   * override this method and provide a more efficient implementation, if
+   * possible. 
+   *
+   * @param p target path
+   * @return remote iterator
+   */
+  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+  throws FileNotFoundException, IOException {
+    return new RemoteIterator<FileStatus>() {
+      private final FileStatus[] stats = listStatus(p);
+      private int i = 0;
+
+      @Override
+      public boolean hasNext() {
+        return i<stats.length;
+      }
+
+      @Override
+      public FileStatus next() throws IOException {
+        if (!hasNext()) {
+          throw new NoSuchElementException("No more entry in " + p);
+        }
+        return stats[i++];
+      }
+    };
+  }
+
+  /**
    * List the statuses and block locations of the files in the given path.
    * 
    * If the path is a directory, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index e729e67..3d5a753 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -251,6 +251,13 @@ public class FilterFileSystem extends FileSystem {
     return fs.listLocatedStatus(f);
   }
   
+  /** Return a remote iterator for listing in a directory */
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path f)
+  throws IOException {
+    return fs.listStatusIterator(f);
+   }
+
   @Override
   public Path getHomeDirectory() {
     return fs.getHomeDirectory();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
index 9a5f40e..933ad1a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
@@ -334,37 +334,11 @@ public final class FileSystemTestWrapper extends FSTestWrapper {
     return fs.getFileChecksum(f);
   }
 
-  private class FakeRemoteIterator<E> implements RemoteIterator<E> {
-
-    private E[] elements;
-    private int count;
-
-    FakeRemoteIterator(E[] elements) {
-      this.elements = elements;
-      count = 0;
-    }
-
-    @Override
-    public boolean hasNext() throws IOException {
-      return count < elements.length;
-    }
-
-    @Override
-    public E next() throws IOException {
-      if (hasNext()) {
-        return elements[count++];
-      }
-      return null;
-    }
-  }
-
   @Override
   public RemoteIterator<FileStatus> listStatusIterator(Path f)
       throws AccessControlException, FileNotFoundException,
       UnsupportedFileSystemException, IOException {
-    // Fake the RemoteIterator, because FileSystem has no such thing
-    FileStatus[] statuses = fs.listStatus(f);
-    return new FakeRemoteIterator<FileStatus>(statuses);
+    return fs.listStatusIterator(f);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 1e86439..374bb2e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -125,6 +125,7 @@ public class TestHarFileSystem {
     public Iterator<LocatedFileStatus> listLocatedStatus(Path f);
     public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
         PathFilter filter);
+    public Iterator<FileStatus> listStatusIterator(Path f);
     public void copyFromLocalFile(Path src, Path dst);
     public void moveFromLocalFile(Path[] srcs, Path dst);
     public void moveFromLocalFile(Path src, Path dst);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 1ad7ca3..d4653ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem {
   protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
       final PathFilter filter)
   throws IOException {
-    final Path absF = fixRelativePart(p);
-    return new RemoteIterator<LocatedFileStatus>() {
-      private DirectoryListing thisListing;
-      private int i;
-      private String src;
-      private LocatedFileStatus curStat = null;
-
-      { // initializer
-        // Fully resolve symlinks in path first to avoid additional resolution
-        // round-trips as we fetch more batches of listings
-        src = getPathName(resolvePath(absF));
-        // fetch the first batch of entries in the directory
-        thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
-        statistics.incrementReadOps(1);
-        if (thisListing == null) { // the directory does not exist
-          throw new FileNotFoundException("File " + p + " does not exist.");
+    Path absF = fixRelativePart(p);
+    return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
+      @Override
+      public RemoteIterator<LocatedFileStatus> doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return new DirListingIterator<LocatedFileStatus>(p, filter, true);
+      }
+
+      @Override
+      public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
         }
+        // symlink resolution for this methos does not work cross file systems
+        // because it is a protected method.
+        throw new IOException("Link resolution does not work with multiple " +
+            "file systems for listLocatedStatus(): " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+
+  /**
+   * Returns a remote iterator so that followup calls are made on demand
+   * while consuming the entries. This reduces memory consumption during
+   * listing of a large directory.
+   *
+   * @param p target path
+   * @return remote iterator
+   */
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+  throws IOException {
+    Path absF = fixRelativePart(p);
+    return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
+      @Override
+      public RemoteIterator<FileStatus> doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return new DirListingIterator<FileStatus>(p, false);
       }
 
       @Override
-      public boolean hasNext() throws IOException {
-        while (curStat == null && hasNextNoFilter()) {
-          LocatedFileStatus next = 
-              ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
-              .makeQualifiedLocated(getUri(), absF);
-          if (filter.accept(next.getPath())) {
-            curStat = next;
-          }
+      public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
+          throws IOException {
+          return ((DistributedFileSystem)fs).listStatusIterator(p);
+      }
+    }.resolve(this, absF);
+
+  }
+
+  /**
+   * This class defines an iterator that returns
+   * the file status of each file/subdirectory of a directory
+   * 
+   * if needLocation, status contains block location if it is a file
+   * throws a RuntimeException with the error as its cause.
+   * 
+   * @param <T> the type of the file status
+   */
+  private class  DirListingIterator<T extends FileStatus>
+  implements RemoteIterator<T> {
+    private DirectoryListing thisListing;
+    private int i;
+    private Path p;
+    private String src;
+    private T curStat = null;
+    private PathFilter filter;
+    private boolean needLocation;
+
+    private DirListingIterator(Path p, PathFilter filter,
+        boolean needLocation) throws IOException {
+      this.p = p;
+      this.src = getPathName(p);
+      this.filter = filter;
+      this.needLocation = needLocation;
+      // fetch the first batch of entries in the directory
+      thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
+          needLocation);
+      statistics.incrementReadOps(1);
+      if (thisListing == null) { // the directory does not exist
+        throw new FileNotFoundException("File " + p + " does not exist.");
+      }
+      i = 0;
+    }
+
+    private DirListingIterator(Path p, boolean needLocation)
+        throws IOException {
+      this(p, null, needLocation);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean hasNext() throws IOException {
+      while (curStat == null && hasNextNoFilter()) {
+        T next;
+        HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
+        if (needLocation) {
+          next = (T)((HdfsLocatedFileStatus)fileStat)
+              .makeQualifiedLocated(getUri(), p);
+        } else {
+          next = (T)fileStat.makeQualified(getUri(), p);
+        }
+          // apply filter if not null
+        if (filter == null || filter.accept(next.getPath())) {
+          curStat = next;
         }
-        return curStat != null;
       }
+      return curStat != null;
+    }
       
-      /** Check if there is a next item before applying the given filter */
-      private boolean hasNextNoFilter() throws IOException {
+    /** Check if there is a next item before applying the given filter */
+    private boolean hasNextNoFilter() throws IOException {
+      if (thisListing == null) {
+        return false;
+      }
+      if (i >= thisListing.getPartialListing().length
+          && thisListing.hasMore()) { 
+        // current listing is exhausted & fetch a new listing
+        thisListing = dfs.listPaths(src, thisListing.getLastName(),
+            needLocation);
+        statistics.incrementReadOps(1);
         if (thisListing == null) {
           return false;
         }
-        if (i>=thisListing.getPartialListing().length
-            && thisListing.hasMore()) { 
-          // current listing is exhausted & fetch a new listing
-          thisListing = dfs.listPaths(src, thisListing.getLastName(), true);
-          statistics.incrementReadOps(1);
-          if (thisListing == null) {
-            return false;
-          }
-          i = 0;
-        }
-        return (i<thisListing.getPartialListing().length);
+        i = 0;
       }
+      return (i < thisListing.getPartialListing().length);
+    }
 
-      @Override
-      public LocatedFileStatus next() throws IOException {
-        if (hasNext()) {
-          LocatedFileStatus tmp = curStat;
-          curStat = null;
-          return tmp;
-        } 
-        throw new java.util.NoSuchElementException("No more entry in " + p);
-      }
-    };
+    @Override
+    public T next() throws IOException {
+      if (hasNext()) {
+        T tmp = curStat;
+        curStat = null;
+        return tmp;
+      } 
+      throw new java.util.NoSuchElementException("No more entry in " + p);
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
index 55a871e..d6decd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
@@ -209,6 +209,9 @@ public class TestFileStatus {
     RemoteIterator<FileStatus> itor = fc.listStatus(dir);
     assertFalse(dir + " should be empty", itor.hasNext());
 
+    itor = fs.listStatusIterator(dir);
+    assertFalse(dir + " should be empty", itor.hasNext());
+
     // create another file that is smaller than a block.
     Path file2 = new Path(dir, "filestatus2.dat");
     DFSTestUtil.createFile(fs, file2, blockSize/4, blockSize/4, blockSize,
@@ -246,6 +249,12 @@ public class TestFileStatus {
     assertEquals(file3.toString(), itor.next().getPath().toString());
     assertFalse("Unexpected addtional file", itor.hasNext());
 
+    itor = fs.listStatusIterator(dir);
+    assertEquals(file2.toString(), itor.next().getPath().toString());
+    assertEquals(file3.toString(), itor.next().getPath().toString());
+    assertFalse("Unexpected addtional file", itor.hasNext());
+
+
     // Test iterative listing. Now dir has 2 entries, create one more.
     Path dir3 = fs.makeQualified(new Path(dir, "dir3"));
     fs.mkdirs(dir3);
@@ -262,6 +271,12 @@ public class TestFileStatus {
     assertEquals(file3.toString(), itor.next().getPath().toString());
     assertFalse("Unexpected addtional file", itor.hasNext());
 
+    itor = fs.listStatusIterator(dir);
+    assertEquals(dir3.toString(), itor.next().getPath().toString());
+    assertEquals(file2.toString(), itor.next().getPath().toString());
+    assertEquals(file3.toString(), itor.next().getPath().toString());
+    assertFalse("Unexpected addtional file", itor.hasNext());
+
     // Now dir has 3 entries, create two more
     Path dir4 = fs.makeQualified(new Path(dir, "dir4"));
     fs.mkdirs(dir4);
@@ -284,7 +299,17 @@ public class TestFileStatus {
     assertEquals(file2.toString(), itor.next().getPath().toString());
     assertEquals(file3.toString(), itor.next().getPath().toString());
 
-    assertFalse(itor.hasNext());      
+    assertFalse(itor.hasNext());
+
+    itor = fs.listStatusIterator(dir);
+    assertEquals(dir3.toString(), itor.next().getPath().toString());
+    assertEquals(dir4.toString(), itor.next().getPath().toString());
+    assertEquals(dir5.toString(), itor.next().getPath().toString());
+    assertEquals(file2.toString(), itor.next().getPath().toString());
+    assertEquals(file3.toString(), itor.next().getPath().toString());
+
+    assertFalse(itor.hasNext());
+      
 
     fs.delete(dir, true);
   }