You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ht...@apache.org on 2020/05/25 11:42:54 UTC

[asterixdb] 01/03: [ASTERIXDB-2736][EXT] Ensure retrieving all objects if an S3 bucket has more than 1000 objects

This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 1a28c658810ded5d50ff88f42ef88ec8f82926e1
Author: Hussain Towaileb <Hu...@Gmail.com>
AuthorDate: Fri May 15 12:50:07 2020 +0300

    [ASTERIXDB-2736][EXT] Ensure retrieving all objects if an S3 bucket has more than 1000 objects
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Fixed the AWS reader to handle reading more than 1000
      objects coming in a single request.
    - Added a test case for the above mentioned item.
    
    Change-Id: Ic7891aa86852e07dfad9ce41de908b34f86bdb42
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6344
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
---
 .../aws/AwsS3ExternalDatasetTest.java              | 18 +++++++++++
 .../over-1000-objects.000.ddl.sqlpp                | 37 ++++++++++++++++++++++
 .../over-1000-objects.001.query.sqlpp              | 22 +++++++++++++
 .../over-1000-objects.099.ddl.sqlpp                | 20 ++++++++++++
 .../s3/over-1000-objects/over-1000-objects.001.adm |  1 +
 .../runtimets/testsuite_external_dataset.xml       |  5 +++
 .../record/reader/aws/AwsS3InputStreamFactory.java | 32 ++++++++++++++++---
 7 files changed, 130 insertions(+), 5 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 55c78e3..23bf3bd 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -103,6 +103,10 @@ public class AwsS3ExternalDatasetTest {
     private static final String S3_MOCK_SERVER_BUCKET_CSV_DEFINITION = "csv-data/reviews/"; // data resides here
     private static final String S3_MOCK_SERVER_BUCKET_TSV_DEFINITION = "tsv-data/reviews/"; // data resides here
 
+    // This is used for a test to generate over 1000 number of files
+    private static final String OVER_1000_OBJECTS_PATH = "over-1000-objects";
+    private static final int OVER_1000_OBJECTS_COUNT = 2999;
+
     private static final Set<String> fileNames = new HashSet<>();
     private static final CreateBucketRequest.Builder CREATE_BUCKET_BUILDER = CreateBucketRequest.builder();
     private static final DeleteBucketRequest.Builder DELETE_BUCKET_BUILDER = DeleteBucketRequest.builder();
@@ -213,6 +217,10 @@ public class AwsS3ExternalDatasetTest {
         LOGGER.info("Adding TSV files to the bucket");
         loadTsvFiles();
         LOGGER.info("TSV Files added successfully");
+
+        LOGGER.info("Loading " + OVER_1000_OBJECTS_COUNT + " into " + OVER_1000_OBJECTS_PATH);
+        loadLargeNumberOfFiles();
+        LOGGER.info("Added " + OVER_1000_OBJECTS_COUNT + " files into " + OVER_1000_OBJECTS_PATH + " successfully");
     }
 
     /**
@@ -381,6 +389,16 @@ public class AwsS3ExternalDatasetTest {
         }
     }
 
+    /**
+     * Generates over 1000 objects and upload them to S3 mock server, 1 record per object
+     */
+    private static void loadLargeNumberOfFiles() {
+        for (int i = 0; i < OVER_1000_OBJECTS_COUNT; i++) {
+            RequestBody body = RequestBody.fromString("{\"id\":" + i + "}");
+            client.putObject(builder.key(OVER_1000_OBJECTS_PATH + "/" + i + ".json").build(), body);
+        }
+    }
+
     static class AwsTestExecutor extends TestExecutor {
 
         public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp
new file mode 100644
index 0000000..a26caeb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="over-1000-objects"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp
new file mode 100644
index 0000000..affdb87
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select count(*) `count` from test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm
new file mode 100644
index 0000000..b610b1d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm
@@ -0,0 +1 @@
+{ "count": 2999 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index 551d777..5aa1326 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -67,5 +67,10 @@
         <output-dir compare="Text">aws/s3/empty-string-definition</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/over-1000-objects">
+        <output-dir compare="Text">aws/s3/over-1000-objects</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 58a77b1..9158a57 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -42,8 +42,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public class AwsS3InputStreamFactory implements IInputStreamFactory {
@@ -86,13 +86,35 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory {
         S3Client s3Client = buildAwsS3Client(configuration);
 
         // Get all objects in a bucket and extract the paths to files
-        ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+        ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
         String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME);
         if (path != null) {
             listObjectsBuilder.prefix(path + (!path.isEmpty() && !path.endsWith("/") ? "/" : ""));
         }
-        ListObjectsResponse listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
-        List<S3Object> s3Objects = listObjectsResponse.contents();
+
+        ListObjectsV2Response listObjectsResponse;
+        List<S3Object> s3Objects = new ArrayList<>();
+        boolean done = false;
+        String newMarker = null;
+
+        while (!done) {
+            // List the objects from the start, or from the last marker in case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+            }
+
+            // Collect all the provided objects
+            s3Objects.addAll(listObjectsResponse.contents());
+
+            // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+            if (!listObjectsResponse.isTruncated()) {
+                done = true;
+            } else {
+                newMarker = listObjectsResponse.nextContinuationToken();
+            }
+        }
 
         // Exclude the directories and get the files only
         String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);