You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/08/10 13:48:47 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #2207: HADOOP-17074 Optimise s3a Listing to be fully asynchronous.

steveloughran commented on a change in pull request #2207:
URL: https://github.com/apache/hadoop/pull/2207#discussion_r467906968



##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -2244,8 +2263,9 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest) {
    * not be saved to the metadata store and
    * fs.s3a.metadatastore.fail.on.write.error=true
    */
+  @VisibleForTesting
   @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
-  PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+  public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)

Review comment:
       get a WriteOperationHelper instance and invoke via that

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
##########
@@ -137,6 +159,78 @@ public void testListOperations() throws Throwable {
     }
   }
 
+  @Test
+  public void testMultiPagesListingPerformanceAndCorrectness() throws Throwable {
+    describe("Check performance and correctness for multi page listing " +
+            "using different listing api");
+    Path dir = path(this.getMethodName());
+    Configuration conf = getConfigurationWithConfiguredBatchSize(10);
+    fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf);
+    assume("Test is only for raw fs", !fs.hasMetadataStore());
+    fs.create(dir);
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+    final int numOfPutRequests = 500;
+    final List<String> originalListOfFiles = new ArrayList<>();
+    List<Callable<PutObjectResult>> putObjectRequests = new ArrayList<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(50);
+    try {
+      for (int i=0; i<numOfPutRequests; i++) {
+        Path file = new Path(dir, String.format("file-%03d", i));
+        originalListOfFiles.add(file.toString());
+        ObjectMetadata om = fs.newObjectMetadata(0L);
+        PutObjectRequest put = new PutObjectRequest(fs.getBucket(),
+                fs.pathToKey(file),
+                im,
+                om);
+        putObjectRequests.add(() -> fs.putObjectDirect(put));
+      }
+      executorService.invokeAll(putObjectRequests);

Review comment:
       Add some tracking/logging of how long it took to create all these files. 

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
##########
@@ -761,29 +777,44 @@ public boolean hasNext() throws IOException {
     @Retries.RetryTranslated
     public S3ListResult next() throws IOException {
       if (firstListing) {
-        // on the first listing, don't request more data.
-        // Instead just clear the firstListing flag so that it future calls
-        // will request new data.
+        // clear the firstListing flag for future calls.
         firstListing = false;
+        // Calculating the result of last async list call.
+        objects = awaitFuture(s3ListResultFuture);
+        fetchNextBatchAsyncIfPresent();
       } else {
         try {
-          if (!objects.isTruncated()) {
+          if (objectsPrev!= null && !objectsPrev.isTruncated()) {
             // nothing more to request: fail.
             throw new NoSuchElementException("No more results in listing of "
-                + listPath);
+                    + listPath);
           }
-          // need to request a new set of objects.
+          // Calculating the result of last async list call.
+          objects = awaitFuture(s3ListResultFuture);
+          // Requesting next batch of results.
           LOG.debug("[{}], Requesting next {} objects under {}",

Review comment:
       move the log into fetchNextBatchAsyncIfPresent

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
##########
@@ -137,6 +159,78 @@ public void testListOperations() throws Throwable {
     }
   }
 
+  @Test
+  public void testMultiPagesListingPerformanceAndCorrectness() throws Throwable {
+    describe("Check performance and correctness for multi page listing " +
+            "using different listing api");
+    Path dir = path(this.getMethodName());

Review comment:
        methodPath()

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
##########
@@ -137,6 +159,78 @@ public void testListOperations() throws Throwable {
     }
   }
 
+  @Test
+  public void testMultiPagesListingPerformanceAndCorrectness() throws Throwable {
+    describe("Check performance and correctness for multi page listing " +
+            "using different listing api");
+    Path dir = path(this.getMethodName());
+    Configuration conf = getConfigurationWithConfiguredBatchSize(10);
+    fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf);
+    assume("Test is only for raw fs", !fs.hasMetadataStore());
+    fs.create(dir);
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+    final int numOfPutRequests = 500;
+    final List<String> originalListOfFiles = new ArrayList<>();
+    List<Callable<PutObjectResult>> putObjectRequests = new ArrayList<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(50);
+    try {
+      for (int i=0; i<numOfPutRequests; i++) {
+        Path file = new Path(dir, String.format("file-%03d", i));
+        originalListOfFiles.add(file.toString());
+        ObjectMetadata om = fs.newObjectMetadata(0L);
+        PutObjectRequest put = new PutObjectRequest(fs.getBucket(),
+                fs.pathToKey(file),
+                im,
+                om);
+        putObjectRequests.add(() -> fs.putObjectDirect(put));
+      }
+      executorService.invokeAll(putObjectRequests);
+    } finally {
+      executorService.shutdown();
+    }
+    RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =

Review comment:
       use a shorter name so the lines after are less wide

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -1956,6 +1956,14 @@ protected S3ListResult listObjects(S3ListRequest request) throws IOException {
     }
   }
 
+  protected CompletableFuture<S3ListResult> listObjectsAsync(S3ListRequest request) {

Review comment:
       make private unless someone needs to get at these in mockito tests

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java
##########
@@ -137,6 +159,78 @@ public void testListOperations() throws Throwable {
     }
   }
 
+  @Test
+  public void testMultiPagesListingPerformanceAndCorrectness() throws Throwable {
+    describe("Check performance and correctness for multi page listing " +
+            "using different listing api");
+    Path dir = path(this.getMethodName());
+    Configuration conf = getConfigurationWithConfiguredBatchSize(10);
+    fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf);

Review comment:
        any reason to not use a local variable here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org