You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/10/07 12:59:35 UTC
[hadoop] branch trunk updated: HADOOP-17281 Implement
FileSystem.listStatusIterator() in S3AFileSystem (#2354)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 82522d6 HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)
82522d6 is described below
commit 82522d60fb545b81a70b36455c89694b544e391c
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Wed Oct 7 18:29:06 2020 +0530
HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)
Contains HADOOP-17300: FileSystem.DirListingIterator.next() call should
return NoSuchElementException
Contributed by Mukund Thakur
---
.../main/java/org/apache/hadoop/fs/FileSystem.java | 4 +-
.../src/site/markdown/filesystem/filesystem.md | 18 ++++++
.../AbstractContractGetFileStatusTest.java | 70 ++++++++++++++++++++++
.../AbstractContractRootDirectoryTest.java | 11 ++++
.../hadoop/fs/contract/ContractTestUtils.java | 48 ++++++++++++++-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 24 ++++++++
.../fs/s3a/scale/ITestS3ADirectoryPerformance.java | 18 ++++++
7 files changed, 191 insertions(+), 2 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0de1071..9927247 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -2229,7 +2229,9 @@ public abstract class FileSystem extends Configured
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
- Preconditions.checkState(hasNext(), "No more items in iterator");
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more items in iterator");
+ }
if (i == entries.getEntries().length) {
fetchMore();
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 089af06..284a964 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -294,6 +294,24 @@ any optimizations.
The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.
+### `RemoteIterator<FileStatus> listStatusIterator(Path p)`
+
+Return an iterator enumerating the `FileStatus` entries under
+a path. This is similar to `listStatus(Path)` except the fact that
+rather than returning an entire list, an iterator is returned.
+The result is exactly the same as `listStatus(Path)`, provided no other
+caller updates the directory during the listing. Having said that, this does
+not guarantee atomicity if other callers are adding/deleting the files
+inside the directory while listing is being performed. Different filesystems
+may provide a more efficient implementation, for example S3A does the
+listing in pages and fetches the next pages asynchronously while a
+page is getting processed.
+
+Note that now since the initial listing is async, bucket/path existence
+exception may show up later during next() call.
+
+Callers should prefer using listStatusIterator over listStatus as it
+is incremental in nature.
### `FileStatus[] listStatus(Path[] paths)`
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
index f63314d..c0d9733 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import org.assertj.core.api.Assertions;
+
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
@@ -148,6 +150,7 @@ public abstract class AbstractContractGetFileStatusTest extends
public void testComplexDirActions() throws Throwable {
TreeScanResults tree = createTestTree();
checkListStatusStatusComplexDir(tree);
+ checkListStatusIteratorComplexDir(tree);
checkListLocatedStatusStatusComplexDir(tree);
checkListFilesComplexDirNonRecursive(tree);
checkListFilesComplexDirRecursive(tree);
@@ -170,6 +173,34 @@ public abstract class AbstractContractGetFileStatusTest extends
}
/**
+ * Test {@link FileSystem#listStatusIterator(Path)} on a complex
+ * directory tree.
+ * @param tree directory tree to list.
+ * @throws Throwable
+ */
+ protected void checkListStatusIteratorComplexDir(TreeScanResults tree)
+ throws Throwable {
+ describe("Expect listStatusIterator to list all entries in top dir only");
+
+ FileSystem fs = getFileSystem();
+ TreeScanResults listing = new TreeScanResults(
+ fs.listStatusIterator(tree.getBasePath()));
+ listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
+
+ List<FileStatus> resWithoutCheckingHasNext =
+ iteratorToListThroughNextCallsAlone(fs
+ .listStatusIterator(tree.getBasePath()));
+
+ List<FileStatus> resWithCheckingHasNext = iteratorToList(fs
+ .listStatusIterator(tree.getBasePath()));
+ Assertions.assertThat(resWithCheckingHasNext)
+ .describedAs("listStatusIterator() should return correct " +
+ "results even if hasNext() calls are not made.")
+ .hasSameElementsAs(resWithoutCheckingHasNext);
+
+ }
+
+ /**
* Test {@link FileSystem#listLocatedStatus(Path)} on a complex
* directory tree.
* @param tree directory tree to list.
@@ -323,6 +354,45 @@ public abstract class AbstractContractGetFileStatusTest extends
}
@Test
+ public void testListStatusIteratorFile() throws Throwable {
+ describe("test the listStatusIterator(path) on a file");
+ Path f = touchf("listStItrFile");
+
+ List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+ getFileSystem().listStatusIterator(f));
+ validateListingForFile(f, statusList, false);
+
+ List<FileStatus> statusList2 =
+ (List<FileStatus>) iteratorToListThroughNextCallsAlone(
+ getFileSystem().listStatusIterator(f));
+ validateListingForFile(f, statusList2, true);
+ }
+
+ /**
+ * Validate listing result for an input path which is file.
+ * @param f file.
+ * @param statusList list status of a file.
+ * @param nextCallAlone whether the listing generated just using
+ * next() calls.
+ */
+ private void validateListingForFile(Path f,
+ List<FileStatus> statusList,
+ boolean nextCallAlone) {
+ String msg = String.format("size of file list returned using %s should " +
+ "be 1", nextCallAlone ?
+ "next() calls alone" : "hasNext() and next() calls");
+ Assertions.assertThat(statusList)
+ .describedAs(msg)
+ .hasSize(1);
+ Assertions.assertThat(statusList.get(0).getPath())
+ .describedAs("path returned should match with the input path")
+ .isEqualTo(f);
+ Assertions.assertThat(statusList.get(0).isFile())
+ .describedAs("path returned should be a file")
+ .isEqualTo(true);
+ }
+
+ @Test
public void testListFilesFile() throws Throwable {
describe("test the listStatus(path) on a file");
Path f = touchf("listfilesfile");
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
index 27c6933..6eaa56b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
@@ -22,13 +22,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
@@ -39,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -242,6 +246,13 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
+ "listStatus = " + listStatusResult
+ "listFiles = " + listFilesResult,
fileList.size() <= statuses.length);
+ List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+ fs.listStatusIterator(root));
+ Assertions.assertThat(statusList)
+ .describedAs("Result of listStatus(/) and listStatusIterator(/)"
+ + " must match")
+ .hasSameElementsAs(Arrays.stream(statuses)
+ .collect(Collectors.toList()));
}
@Test
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 4789630..39a41d0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1455,6 +1455,52 @@ public class ContractTestUtils extends Assert {
/**
* Convert a remote iterator over file status results into a list.
+ * The utility equivalents in commons collection and guava cannot be
+ * used here, as this is a different interface, one whose operators
+ * can throw IOEs.
+ * @param iterator input iterator
+ * @return the file status entries as a list.
+ * @throws IOException
+ */
+ public static <T extends FileStatus> List<T> iteratorToList(
+ RemoteIterator<T> iterator) throws IOException {
+ List<T> list = new ArrayList<>();
+ while (iterator.hasNext()) {
+ list.add(iterator.next());
+ }
+ return list;
+ }
+
+
+ /**
+ * Convert a remote iterator over file status results into a list.
+ * This uses {@link RemoteIterator#next()} calls only, expecting
+ * a raised {@link NoSuchElementException} exception to indicate that
+ * the end of the listing has been reached. This iteration strategy is
+ * designed to verify that the implementation of the remote iterator
+ * generates results and terminates consistently with the {@code hasNext/next}
+ * iteration. More succinctly "verifies that the {@code next()} operator
+ * isn't relying on {@code hasNext()} to always be called during an iteration.
+ * @param iterator input iterator
+ * @return the status entries as a list.
+ * @throws IOException IO problems
+ */
+ @SuppressWarnings("InfiniteLoopStatement")
+ public static <T extends FileStatus> List<T> iteratorToListThroughNextCallsAlone(
+ RemoteIterator<T> iterator) throws IOException {
+ List<T> list = new ArrayList<>();
+ try {
+ while (true) {
+ list.add(iterator.next());
+ }
+ } catch (NoSuchElementException expected) {
+ // ignored
+ }
+ return list;
+ }
+
+ /**
+ * Convert a remote iterator over file status results into a list.
* This uses {@link RemoteIterator#next()} calls only, expecting
* a raised {@link NoSuchElementException} exception to indicate that
* the end of the listing has been reached. This iteration strategy is
@@ -1602,7 +1648,7 @@ public class ContractTestUtils extends Assert {
* @param results results of the listFiles/listStatus call.
* @throws IOException IO problems during the iteration.
*/
- public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+ public TreeScanResults(RemoteIterator<? extends FileStatus> results)
throws IOException {
while (results.hasNext()) {
add(results.next());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 86f2a88..d7b576a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -2644,6 +2644,30 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
+ * Override subclass such that we benefit for async listing done
+ * in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
+ * {@inheritDoc}
+ *
+ */
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(Path p)
+ throws FileNotFoundException, IOException {
+ RemoteIterator<S3AFileStatus> listStatusItr = once("listStatus",
+ p.toString(), () -> innerListStatus(p));
+ return new RemoteIterator<FileStatus>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return listStatusItr.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ return listStatusItr.next();
+ }
+ };
+ }
+
+ /**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
index a3cca75..44e3a8a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
@@ -231,6 +231,24 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
"match with original list of files")
.hasSameElementsAs(originalListOfFiles)
.hasSize(numOfPutRequests);
+ // Validate listing using listStatusIterator().
+ NanoTimer timeUsingListStatusItr = new NanoTimer();
+ RemoteIterator<FileStatus> lsItr = fs.listStatusIterator(dir);
+ List<String> listUsingListStatusItr = new ArrayList<>();
+ while (lsItr.hasNext()) {
+ listUsingListStatusItr.add(lsItr.next().getPath().toString());
+ Thread.sleep(eachFileProcessingTime);
+ }
+ timeUsingListStatusItr.end("listing %d files using " +
+ "listStatusIterator() api with batch size of %d " +
+ "including %dms of processing time for each file",
+ numOfPutRequests, batchSize, eachFileProcessingTime);
+ Assertions.assertThat(listUsingListStatusItr)
+ .describedAs("Listing results using listStatusIterator() must" +
+ "match with original list of files")
+ .hasSameElementsAs(originalListOfFiles)
+ .hasSize(numOfPutRequests);
+
} finally {
executorService.shutdown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org