You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/11/03 15:21:15 UTC
git commit: HADOOP-10987. Provide an iterator-based listing API for
FileSystem. Contributed by Kihwal Lee.
Repository: hadoop
Updated Branches:
refs/heads/trunk 27715ec63 -> 67f13b58e
HADOOP-10987. Provide an iterator-based listing API for FileSystem. Contributed by Kihwal Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67f13b58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67f13b58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67f13b58
Branch: refs/heads/trunk
Commit: 67f13b58e4d41879845aa118186d984de2e312ed
Parents: 27715ec
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon Nov 3 08:20:22 2014 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon Nov 3 08:20:22 2014 -0600
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../java/org/apache/hadoop/fs/FileSystem.java | 30 ++++
.../org/apache/hadoop/fs/FilterFileSystem.java | 7 +
.../apache/hadoop/fs/FileSystemTestWrapper.java | 28 +--
.../org/apache/hadoop/fs/TestHarFileSystem.java | 1 +
.../hadoop/hdfs/DistributedFileSystem.java | 175 ++++++++++++++-----
.../org/apache/hadoop/hdfs/TestFileStatus.java | 27 ++-
7 files changed, 194 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 689cb45..93e58f6 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -351,6 +351,8 @@ Release 2.7.0 - UNRELEASED
NEW FEATURES
+ HADOOP-10987. Provide an iterator-based listing API for FileSystem (kihwal)
+
IMPROVEMENTS
HADOOP-11156. DelegateToFileSystem should implement
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 1d2270b..9edc54b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1701,6 +1701,36 @@ public abstract class FileSystem extends Configured implements Closeable {
}
/**
+ * Returns a remote iterator so that followup calls are made on demand
+ * while consuming the entries. Each file system implementation should
+ * override this method and provide a more efficient implementation, if
+ * possible.
+ *
+ * @param p target path
+ * @return remote iterator
+ */
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+ throws FileNotFoundException, IOException {
+ return new RemoteIterator<FileStatus>() {
+ private final FileStatus[] stats = listStatus(p);
+ private int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i<stats.length;
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more entry in " + p);
+ }
+ return stats[i++];
+ }
+ };
+ }
+
+ /**
* List the statuses and block locations of the files in the given path.
*
* If the path is a directory,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index e729e67..3d5a753 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -251,6 +251,13 @@ public class FilterFileSystem extends FileSystem {
return fs.listLocatedStatus(f);
}
+ /** Return a remote iterator for listing in a directory */
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(Path f)
+ throws IOException {
+ return fs.listStatusIterator(f);
+ }
+
@Override
public Path getHomeDirectory() {
return fs.getHomeDirectory();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
index 9a5f40e..933ad1a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java
@@ -334,37 +334,11 @@ public final class FileSystemTestWrapper extends FSTestWrapper {
return fs.getFileChecksum(f);
}
- private class FakeRemoteIterator<E> implements RemoteIterator<E> {
-
- private E[] elements;
- private int count;
-
- FakeRemoteIterator(E[] elements) {
- this.elements = elements;
- count = 0;
- }
-
- @Override
- public boolean hasNext() throws IOException {
- return count < elements.length;
- }
-
- @Override
- public E next() throws IOException {
- if (hasNext()) {
- return elements[count++];
- }
- return null;
- }
- }
-
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path f)
throws AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
- // Fake the RemoteIterator, because FileSystem has no such thing
- FileStatus[] statuses = fs.listStatus(f);
- return new FakeRemoteIterator<FileStatus>(statuses);
+ return fs.listStatusIterator(f);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 1e86439..374bb2e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -125,6 +125,7 @@ public class TestHarFileSystem {
public Iterator<LocatedFileStatus> listLocatedStatus(Path f);
public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
PathFilter filter);
+ public Iterator<FileStatus> listStatusIterator(Path f);
public void copyFromLocalFile(Path src, Path dst);
public void moveFromLocalFile(Path[] srcs, Path dst);
public void moveFromLocalFile(Path src, Path dst);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 1ad7ca3..d4653ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem {
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
final PathFilter filter)
throws IOException {
- final Path absF = fixRelativePart(p);
- return new RemoteIterator<LocatedFileStatus>() {
- private DirectoryListing thisListing;
- private int i;
- private String src;
- private LocatedFileStatus curStat = null;
-
- { // initializer
- // Fully resolve symlinks in path first to avoid additional resolution
- // round-trips as we fetch more batches of listings
- src = getPathName(resolvePath(absF));
- // fetch the first batch of entries in the directory
- thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
- statistics.incrementReadOps(1);
- if (thisListing == null) { // the directory does not exist
- throw new FileNotFoundException("File " + p + " does not exist.");
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
+ @Override
+ public RemoteIterator<LocatedFileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<LocatedFileStatus>(p, filter, true);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
}
+ // symlink resolution for this methos does not work cross file systems
+ // because it is a protected method.
+ throw new IOException("Link resolution does not work with multiple " +
+ "file systems for listLocatedStatus(): " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+
+ /**
+ * Returns a remote iterator so that followup calls are made on demand
+ * while consuming the entries. This reduces memory consumption during
+ * listing of a large directory.
+ *
+ * @param p target path
+ * @return remote iterator
+ */
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+ throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
+ @Override
+ public RemoteIterator<FileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<FileStatus>(p, false);
}
@Override
- public boolean hasNext() throws IOException {
- while (curStat == null && hasNextNoFilter()) {
- LocatedFileStatus next =
- ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
- .makeQualifiedLocated(getUri(), absF);
- if (filter.accept(next.getPath())) {
- curStat = next;
- }
+ public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ return ((DistributedFileSystem)fs).listStatusIterator(p);
+ }
+ }.resolve(this, absF);
+
+ }
+
+ /**
+ * This class defines an iterator that returns
+ * the file status of each file/subdirectory of a directory
+ *
+ * if needLocation, status contains block location if it is a file
+ * throws a RuntimeException with the error as its cause.
+ *
+ * @param <T> the type of the file status
+ */
+ private class DirListingIterator<T extends FileStatus>
+ implements RemoteIterator<T> {
+ private DirectoryListing thisListing;
+ private int i;
+ private Path p;
+ private String src;
+ private T curStat = null;
+ private PathFilter filter;
+ private boolean needLocation;
+
+ private DirListingIterator(Path p, PathFilter filter,
+ boolean needLocation) throws IOException {
+ this.p = p;
+ this.src = getPathName(p);
+ this.filter = filter;
+ this.needLocation = needLocation;
+ // fetch the first batch of entries in the directory
+ thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
+ needLocation);
+ statistics.incrementReadOps(1);
+ if (thisListing == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+ i = 0;
+ }
+
+ private DirListingIterator(Path p, boolean needLocation)
+ throws IOException {
+ this(p, null, needLocation);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean hasNext() throws IOException {
+ while (curStat == null && hasNextNoFilter()) {
+ T next;
+ HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
+ if (needLocation) {
+ next = (T)((HdfsLocatedFileStatus)fileStat)
+ .makeQualifiedLocated(getUri(), p);
+ } else {
+ next = (T)fileStat.makeQualified(getUri(), p);
+ }
+ // apply filter if not null
+ if (filter == null || filter.accept(next.getPath())) {
+ curStat = next;
}
- return curStat != null;
}
+ return curStat != null;
+ }
- /** Check if there is a next item before applying the given filter */
- private boolean hasNextNoFilter() throws IOException {
+ /** Check if there is a next item before applying the given filter */
+ private boolean hasNextNoFilter() throws IOException {
+ if (thisListing == null) {
+ return false;
+ }
+ if (i >= thisListing.getPartialListing().length
+ && thisListing.hasMore()) {
+ // current listing is exhausted & fetch a new listing
+ thisListing = dfs.listPaths(src, thisListing.getLastName(),
+ needLocation);
+ statistics.incrementReadOps(1);
if (thisListing == null) {
return false;
}
- if (i>=thisListing.getPartialListing().length
- && thisListing.hasMore()) {
- // current listing is exhausted & fetch a new listing
- thisListing = dfs.listPaths(src, thisListing.getLastName(), true);
- statistics.incrementReadOps(1);
- if (thisListing == null) {
- return false;
- }
- i = 0;
- }
- return (i<thisListing.getPartialListing().length);
+ i = 0;
}
+ return (i < thisListing.getPartialListing().length);
+ }
- @Override
- public LocatedFileStatus next() throws IOException {
- if (hasNext()) {
- LocatedFileStatus tmp = curStat;
- curStat = null;
- return tmp;
- }
- throw new java.util.NoSuchElementException("No more entry in " + p);
- }
- };
+ @Override
+ public T next() throws IOException {
+ if (hasNext()) {
+ T tmp = curStat;
+ curStat = null;
+ return tmp;
+ }
+ throw new java.util.NoSuchElementException("No more entry in " + p);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67f13b58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
index 55a871e..d6decd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
@@ -209,6 +209,9 @@ public class TestFileStatus {
RemoteIterator<FileStatus> itor = fc.listStatus(dir);
assertFalse(dir + " should be empty", itor.hasNext());
+ itor = fs.listStatusIterator(dir);
+ assertFalse(dir + " should be empty", itor.hasNext());
+
// create another file that is smaller than a block.
Path file2 = new Path(dir, "filestatus2.dat");
DFSTestUtil.createFile(fs, file2, blockSize/4, blockSize/4, blockSize,
@@ -246,6 +249,12 @@ public class TestFileStatus {
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
+ itor = fs.listStatusIterator(dir);
+ assertEquals(file2.toString(), itor.next().getPath().toString());
+ assertEquals(file3.toString(), itor.next().getPath().toString());
+ assertFalse("Unexpected addtional file", itor.hasNext());
+
+
// Test iterative listing. Now dir has 2 entries, create one more.
Path dir3 = fs.makeQualified(new Path(dir, "dir3"));
fs.mkdirs(dir3);
@@ -262,6 +271,12 @@ public class TestFileStatus {
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
+ itor = fs.listStatusIterator(dir);
+ assertEquals(dir3.toString(), itor.next().getPath().toString());
+ assertEquals(file2.toString(), itor.next().getPath().toString());
+ assertEquals(file3.toString(), itor.next().getPath().toString());
+ assertFalse("Unexpected addtional file", itor.hasNext());
+
// Now dir has 3 entries, create two more
Path dir4 = fs.makeQualified(new Path(dir, "dir4"));
fs.mkdirs(dir4);
@@ -284,7 +299,17 @@ public class TestFileStatus {
assertEquals(file2.toString(), itor.next().getPath().toString());
assertEquals(file3.toString(), itor.next().getPath().toString());
- assertFalse(itor.hasNext());
+ assertFalse(itor.hasNext());
+
+ itor = fs.listStatusIterator(dir);
+ assertEquals(dir3.toString(), itor.next().getPath().toString());
+ assertEquals(dir4.toString(), itor.next().getPath().toString());
+ assertEquals(dir5.toString(), itor.next().getPath().toString());
+ assertEquals(file2.toString(), itor.next().getPath().toString());
+ assertEquals(file3.toString(), itor.next().getPath().toString());
+
+ assertFalse(itor.hasNext());
+
fs.delete(dir, true);
}