You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/09/11 06:39:03 UTC

[incubator-pinot] branch fixing_s3_list_api created (now 8665918)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch fixing_s3_list_api
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 8665918  Fix S3PinotFS List API may not return full results

This branch includes the following new commits:

     new 8665918  Fix S3PinotFS List API may not return full results

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Fix S3PinotFS List API may not return full results

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_s3_list_api
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 86659186e8e2196b199a5c2bb011dd560a5a524f
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Sep 10 23:38:37 2020 -0700

    Fix S3PinotFS List API may not return full results
---
 .../spark/SparkSegmentGenerationJobRunner.java     |  1 +
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 57 ++++++++++++----------
 2 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index ad96e5d..c1b3f25 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -188,6 +188,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
       }
     }
 
+    LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
     try {
       JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
 
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index d70eadc..9d03c47 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
@@ -374,33 +375,39 @@ public class S3PinotFS extends PinotFS {
       throws IOException {
     try {
       ImmutableList.Builder<String> builder = ImmutableList.builder();
+      String continuationToken = null;
+      boolean isDone = false;
       String prefix = normalizeToDirectoryPrefix(fileUri);
-
-      ListObjectsV2Response listObjectsV2Response;
-      ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
-          ListObjectsV2Request.builder().bucket(fileUri.getHost());
-
-      if (!prefix.equals(DELIMITER)) {
-        listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix);
-      }
-
-      if (!recursive) {
-        listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.delimiter(DELIMITER);
-      }
-
-      ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build();
-      listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
-
-      listObjectsV2Response.contents().stream().forEach(object -> {
-        //Only add files and not directories
-        if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) {
-          String fileKey = object.key();
-          if (fileKey.startsWith(DELIMITER)) {
-            fileKey = fileKey.substring(1);
-          }
-          builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+      while(!isDone) {
+        ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+            ListObjectsV2Request.builder().bucket(fileUri.getHost());
+        if (!prefix.equals(DELIMITER)) {
+          listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix);
+        }
+        if (!recursive) {
+          listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.delimiter(DELIMITER);
         }
-      });
+        if (continuationToken != null) {
+          listObjectsV2RequestBuilder.continuationToken(continuationToken);
+        }
+        ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build();
+        LOGGER.debug("Trying to send ListObjectsV2Request {}", listObjectsV2Request);
+        ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
+        LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
+        List<S3Object> filesReturned = listObjectsV2Response.contents();
+        filesReturned.stream().forEach(object -> {
+          //Only add files and not directories
+          if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) {
+            String fileKey = object.key();
+            if (fileKey.startsWith(DELIMITER)) {
+              fileKey = fileKey.substring(1);
+            }
+            builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+          }
+        });
+        isDone = !listObjectsV2Response.isTruncated();
+        continuationToken = listObjectsV2Response.nextContinuationToken();
+      }
       return builder.build().toArray(new String[0]);
     } catch (Throwable t) {
       throw new IOException(t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org