You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/10/07 12:59:35 UTC

[hadoop] branch trunk updated: HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82522d6  HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)
82522d6 is described below

commit 82522d60fb545b81a70b36455c89694b544e391c
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Wed Oct 7 18:29:06 2020 +0530

    HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)
    
    
    Contains HADOOP-17300: FileSystem.DirListingIterator.next() call should
    return NoSuchElementException
    
    Contributed by Mukund Thakur
---
 .../main/java/org/apache/hadoop/fs/FileSystem.java |  4 +-
 .../src/site/markdown/filesystem/filesystem.md     | 18 ++++++
 .../AbstractContractGetFileStatusTest.java         | 70 ++++++++++++++++++++++
 .../AbstractContractRootDirectoryTest.java         | 11 ++++
 .../hadoop/fs/contract/ContractTestUtils.java      | 48 ++++++++++++++-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 24 ++++++++
 .../fs/s3a/scale/ITestS3ADirectoryPerformance.java | 18 ++++++
 7 files changed, 191 insertions(+), 2 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0de1071..9927247 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -2229,7 +2229,9 @@ public abstract class FileSystem extends Configured
     @Override
     @SuppressWarnings("unchecked")
     public T next() throws IOException {
-      Preconditions.checkState(hasNext(), "No more items in iterator");
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more items in iterator");
+      }
       if (i == entries.getEntries().length) {
         fetchMore();
       }
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 089af06..284a964 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -294,6 +294,24 @@ any optimizations.
 The atomicity and consistency constraints are as for
 `listStatus(Path, PathFilter)`.
 
+### `RemoteIterator<FileStatus> listStatusIterator(Path p)`
+
+Return an iterator enumerating the `FileStatus` entries under
+a path. This is similar to `listStatus(Path)` except the fact that
+rather than returning an entire list, an iterator is returned.
+The result is exactly the same as `listStatus(Path)`, provided no other
+caller updates the directory during the listing. Having said that, this does
+not guarantee atomicity if other callers are adding/deleting the files
+inside the directory while listing is being performed. Different filesystems
+may provide a more efficient implementation, for example S3A does the
+listing in pages and fetches the next pages asynchronously while a
+page is getting processed.
+
+Note that now since the initial listing is async, bucket/path existence
+exception may show up later during next() call.
+
+Callers should prefer using listStatusIterator over listStatus as it
+is incremental in nature.
 
 ### `FileStatus[] listStatus(Path[] paths)`
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
index f63314d..c0d9733 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+import org.assertj.core.api.Assertions;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
@@ -148,6 +150,7 @@ public abstract class AbstractContractGetFileStatusTest extends
   public void testComplexDirActions() throws Throwable {
     TreeScanResults tree = createTestTree();
     checkListStatusStatusComplexDir(tree);
+    checkListStatusIteratorComplexDir(tree);
     checkListLocatedStatusStatusComplexDir(tree);
     checkListFilesComplexDirNonRecursive(tree);
     checkListFilesComplexDirRecursive(tree);
@@ -170,6 +173,34 @@ public abstract class AbstractContractGetFileStatusTest extends
   }
 
   /**
+   * Test {@link FileSystem#listStatusIterator(Path)} on a complex
+   * directory tree.
+   * @param tree directory tree to list.
+   * @throws Throwable
+   */
+  protected void checkListStatusIteratorComplexDir(TreeScanResults tree)
+          throws Throwable {
+    describe("Expect listStatusIterator to list all entries in top dir only");
+
+    FileSystem fs = getFileSystem();
+    TreeScanResults listing = new TreeScanResults(
+            fs.listStatusIterator(tree.getBasePath()));
+    listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
+
+    List<FileStatus> resWithoutCheckingHasNext =
+            iteratorToListThroughNextCallsAlone(fs
+                    .listStatusIterator(tree.getBasePath()));
+
+    List<FileStatus> resWithCheckingHasNext = iteratorToList(fs
+                    .listStatusIterator(tree.getBasePath()));
+    Assertions.assertThat(resWithCheckingHasNext)
+            .describedAs("listStatusIterator() should return correct " +
+                    "results even if hasNext() calls are not made.")
+            .hasSameElementsAs(resWithoutCheckingHasNext);
+
+  }
+
+  /**
    * Test {@link FileSystem#listLocatedStatus(Path)} on a complex
    * directory tree.
    * @param tree directory tree to list.
@@ -323,6 +354,45 @@ public abstract class AbstractContractGetFileStatusTest extends
   }
 
   @Test
+  public void testListStatusIteratorFile() throws Throwable {
+    describe("test the listStatusIterator(path) on a file");
+    Path f = touchf("listStItrFile");
+
+    List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+            getFileSystem().listStatusIterator(f));
+    validateListingForFile(f, statusList, false);
+
+    List<FileStatus> statusList2 =
+            (List<FileStatus>) iteratorToListThroughNextCallsAlone(
+                    getFileSystem().listStatusIterator(f));
+    validateListingForFile(f, statusList2, true);
+  }
+
+  /**
+   * Validate listing result for an input path which is file.
+   * @param f file.
+   * @param statusList list status of a file.
+   * @param nextCallAlone whether the listing generated just using
+   *                      next() calls.
+   */
+  private void validateListingForFile(Path f,
+                                      List<FileStatus> statusList,
+                                      boolean nextCallAlone) {
+    String msg = String.format("size of file list returned using %s should " +
+            "be 1", nextCallAlone ?
+            "next() calls alone" : "hasNext() and next() calls");
+    Assertions.assertThat(statusList)
+            .describedAs(msg)
+            .hasSize(1);
+    Assertions.assertThat(statusList.get(0).getPath())
+            .describedAs("path returned should match with the input path")
+            .isEqualTo(f);
+    Assertions.assertThat(statusList.get(0).isFile())
+            .describedAs("path returned should be a file")
+            .isEqualTo(true);
+  }
+
+  @Test
   public void testListFilesFile() throws Throwable {
     describe("test the listStatus(path) on a file");
     Path f = touchf("listfilesfile");
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
index 27c6933..6eaa56b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
@@ -22,13 +22,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
+import org.assertj.core.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -39,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -242,6 +246,13 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
             + "listStatus = " + listStatusResult
             + "listFiles = " + listFilesResult,
         fileList.size() <= statuses.length);
+    List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+            fs.listStatusIterator(root));
+    Assertions.assertThat(statusList)
+            .describedAs("Result of listStatus(/) and listStatusIterator(/)"
+                    + " must match")
+            .hasSameElementsAs(Arrays.stream(statuses)
+                    .collect(Collectors.toList()));
   }
 
   @Test
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 4789630..39a41d0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1455,6 +1455,52 @@ public class ContractTestUtils extends Assert {
 
   /**
    * Convert a remote iterator over file status results into a list.
+   * The utility equivalents in commons collection and guava cannot be
+   * used here, as this is a different interface, one whose operators
+   * can throw IOEs.
+   * @param iterator input iterator
+   * @return the file status entries as a list.
+   * @throws IOException
+   */
+  public static <T extends FileStatus> List<T> iteratorToList(
+          RemoteIterator<T> iterator) throws IOException {
+    List<T> list = new ArrayList<>();
+    while (iterator.hasNext()) {
+      list.add(iterator.next());
+    }
+    return list;
+  }
+
+
+  /**
+   * Convert a remote iterator over file status results into a list.
+   * This uses {@link RemoteIterator#next()} calls only, expecting
+   * a raised {@link NoSuchElementException} exception to indicate that
+   * the end of the listing has been reached. This iteration strategy is
+   * designed to verify that the implementation of the remote iterator
+   * generates results and terminates consistently with the {@code hasNext/next}
+   * iteration. More succinctly "verifies that the {@code next()} operator
+   * isn't relying on {@code hasNext()} to always be called during an iteration.
+   * @param iterator input iterator
+   * @return the status entries as a list.
+   * @throws IOException IO problems
+   */
+  @SuppressWarnings("InfiniteLoopStatement")
+  public static <T extends FileStatus> List<T> iteratorToListThroughNextCallsAlone(
+          RemoteIterator<T> iterator) throws IOException {
+    List<T> list = new ArrayList<>();
+    try {
+      while (true) {
+        list.add(iterator.next());
+      }
+    } catch (NoSuchElementException expected) {
+      // ignored
+    }
+    return list;
+  }
+
+  /**
+   * Convert a remote iterator over file status results into a list.
    * This uses {@link RemoteIterator#next()} calls only, expecting
    * a raised {@link NoSuchElementException} exception to indicate that
    * the end of the listing has been reached. This iteration strategy is
@@ -1602,7 +1648,7 @@ public class ContractTestUtils extends Assert {
      * @param results results of the listFiles/listStatus call.
      * @throws IOException IO problems during the iteration.
      */
-    public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+    public TreeScanResults(RemoteIterator<? extends FileStatus> results)
         throws IOException {
       while (results.hasNext()) {
         add(results.next());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 86f2a88..d7b576a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -2644,6 +2644,30 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * Override subclass such that we benefit for async listing done
+   * in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
+   * {@inheritDoc}
+   *
+   */
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path p)
+          throws FileNotFoundException, IOException {
+    RemoteIterator<S3AFileStatus> listStatusItr = once("listStatus",
+            p.toString(), () -> innerListStatus(p));
+    return new RemoteIterator<FileStatus>() {
+      @Override
+      public boolean hasNext() throws IOException {
+        return listStatusItr.hasNext();
+      }
+
+      @Override
+      public FileStatus next() throws IOException {
+        return listStatusItr.next();
+      }
+    };
+  }
+
+  /**
    * List the statuses of the files/directories in the given path if the path is
    * a directory.
    *
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
index a3cca75..44e3a8a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
@@ -231,6 +231,24 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
                       "match with original list of files")
               .hasSameElementsAs(originalListOfFiles)
               .hasSize(numOfPutRequests);
+      // Validate listing using listStatusIterator().
+      NanoTimer timeUsingListStatusItr = new NanoTimer();
+      RemoteIterator<FileStatus> lsItr = fs.listStatusIterator(dir);
+      List<String> listUsingListStatusItr = new ArrayList<>();
+      while (lsItr.hasNext()) {
+        listUsingListStatusItr.add(lsItr.next().getPath().toString());
+        Thread.sleep(eachFileProcessingTime);
+      }
+      timeUsingListStatusItr.end("listing %d files using " +
+                      "listStatusIterator() api with batch size of %d " +
+                      "including %dms of processing time for each file",
+              numOfPutRequests, batchSize, eachFileProcessingTime);
+      Assertions.assertThat(listUsingListStatusItr)
+              .describedAs("Listing results using listStatusIterator() must" +
+                      "match with original list of files")
+              .hasSameElementsAs(originalListOfFiles)
+              .hasSize(numOfPutRequests);
+
     } finally {
       executorService.shutdown();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org