You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "cryptoe (via GitHub)" <gi...@apache.org> on 2023/03/22 06:04:38 UTC

[GitHub] [druid] cryptoe opened a new pull request, #13960: Reworking s3 connector with various improvements:

cryptoe opened a new pull request, #13960:
URL: https://github.com/apache/druid/pull/13960

   ### Description
   
   Reworking s3 connector with : 
   
   1. Adding retries
   2. Adding max fetch size
   3. Using s3Utils for most of the api's
   4. Fixing bugs in DurableStorageCleaner
   5. Moving to Iterator for listDir call
   
   
   
   ##### Key changed/added classes in this PR
    * `S3StorageConnector`
    * `S3Utils`
    * `DurableStorageCleaner`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   
   ### Release notes
   
   Bug fixes in DurableStorageCleaner
   
   
   This PR has:
   
   - [x] been self-reviewed.
   - [x] added documentation for new or modified features or behaviors.
   - [x] a release note entry in the PR description.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] LakshSingla commented on a diff in pull request #13960: Reworking s3 connector with various improvements

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1144631455


##########
processing/src/main/java/org/apache/druid/storage/StorageConnector.java:
##########
@@ -105,26 +105,39 @@
    * with a basePath.
    * If the path is a directory, this method throws an exception.
    *
-   * @param path
-   * @throws IOException
+   * @param path to delete
+   * @throws IOException thrown in case of errors.
    */
   void deleteFile(String path) throws IOException;
 
+
+  /**
+   * Delete files present at the input paths. Most implementations prepend all the input paths
+   * with the basePath.
+   * <br/>
+   * This method is <b>recommended</b> in case we need to delete a batch of files.
+   * If the path is a directory, this method throws an exception.
+   *
+   * @param paths Iterable of the paths to delete.
+   * @throws IOException thrown in case of errors.
+   */
+  void deleteFiles(Iterable<String> paths) throws IOException;
+
   /**
    * Delete a directory pointed to by the path and also recursively deletes all files/directories in said directory.
    * Most implementations prepend the input path with a basePath.
    *
    * @param path path
-   * @throws IOException
+   * @throws IOException thrown in case of errors.
    */
   void deleteRecursively(String path) throws IOException;
 
   /**
-   * Returns a list containing all the files present in the path. The returned filenames should be such that joining
+   * Returns a lazy iterator containing all the files present in the path. The returned filenames should be such that joining
    * the dirName and the file name form the full path that can be used as the arguments for other methods of the storage
    * connector.
-   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the path
-   * "parent1/parent2" should be "child" and for "parent1" should be "parent2"
+   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the input path
+   * "parent1/parent2" should be "child" and for input "parent1" should be "parent2/child"
    */
-  List<String> listDir(String dirName);
+  Iterator<String> listDir(String dirName) throws IOException;

Review Comment:
   Instead of Iterator, I think it would be better to get an Iterable and let the caller fetch the Iterator from the same. The caller might decide wanna iterate over the returned value again without making the list call. 



##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -121,4 +120,14 @@ public static String getOutputsFileNameForPath(
         path
     );
   }
+
+  /**
+   * Tries to parse out the controller taskID from the input path.
+   * <br></br>
+   * For eg:  for input path <b>controller_query_id/task/123</b> <br/>the function will return <b>controller_query_id</b>
+   */
+  public static String getControllerTaskIdWithPrefixFromPath(String path)
+  {
+    return path.split("/", 1)[0];

Review Comment:
   There should be some checks with an error message shown: `Invalid path provided. Cannot extract the controller id from the path [%s] `



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -243,59 +243,73 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS
    * Delete the files from S3 in a specified bucket, matching a specified prefix and filter
    *
    * @param s3Client s3 client
-   * @param config   specifies the configuration to use when finding matching files in S3 to delete
+   * @param maxListingLength  maximum number of keys to fetch and delete at a time
    * @param bucket   s3 bucket
    * @param prefix   the file prefix
    * @param filter   function which returns true if the prefix file found should be deleted and false otherwise.
    *
-   * @throws Exception
+   * @throws Exception in case of errors
    */
+
   public static void deleteObjectsInPath(
       ServerSideEncryptingAmazonS3 s3Client,
-      S3InputDataConfig config,
+      int maxListingLength,
       String bucket,
       String prefix,
       Predicate<S3ObjectSummary> filter
   )
       throws Exception
   {
-    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
+    deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter, RetryUtils.DEFAULT_MAX_TRIES);
+  }
+
+  public static void deleteObjectsInPath(
+      ServerSideEncryptingAmazonS3 s3Client,
+      int maxListingLength,
+      String bucket,
+      String prefix,
+      Predicate<S3ObjectSummary> filter,
+      int maxRetries
+  )
+      throws Exception
+  {
+    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(maxListingLength);
     final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
         s3Client,
         ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
-        config.getMaxListingLength()
+        maxListingLength
     );
 
     while (iterator.hasNext()) {
       final S3ObjectSummary nextObject = iterator.next();
       if (filter.apply(nextObject)) {
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
-        if (keysToDelete.size() == config.getMaxListingLength()) {
-          deleteBucketKeys(s3Client, bucket, keysToDelete);
-          log.info("Deleted %d files", keysToDelete.size());
+        if (keysToDelete.size() == maxListingLength) {
+          deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);

Review Comment:
   Here, for each batch of maxListingLength, we are retrying the operation if it fails `maxRetries` times. This amounts to a total of `maxListingLength * maxRetries` retries.
   I think it would be better if the `deleteBucketKeys` doesn't retry on it's own and we have a top level retry mechanism, something like:
   ```
   RetryUtils.retry(
      // Body of the function
      // deleteBucketKeys(s3Client, bucket, keysToDelete, 1);  // Donot retry the deleteBucketKeys() individually
      // Remaining retriable body of the function
   )
   ```
   WDYT?



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########


Review Comment:
   For the functions that have retries added, we should update the Javadocs to mention that they are retriable `config.getMaxRetry()` amount of times. Or we should update the class level Javadoc that mentions that all the functions are retriable. 



##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java:
##########
@@ -214,21 +237,33 @@ public void pathDeleteRecursively() throws IOException
   }
 
   @Test
-  public void testListDir()
+  public void testListDir() throws IOException
   {
     EasyMock.reset(S3_CLIENT, TEST_RESULT);
 
     S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
     s3ObjectSummary.setBucketName(BUCKET);
     s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
+    s3ObjectSummary.setSize(1);
 
     EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
     EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
+    EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null);
     EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
             .andReturn(TEST_RESULT);
     EasyMock.replay(S3_CLIENT, TEST_RESULT);
 
-    List<String> listDirResult = storageConnector.listDir("/");
+    List<String> listDirResult = Lists.newArrayList(storageConnector.listDir("test/"));
     Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
   }
+
+  private String convetDelReqToString(DeleteObjectsRequest deleteObjectsRequest)

Review Comment:
   ```suggestion
     private String convetDeleteObjectsRequestToString(DeleteObjectsRequest deleteObjectsRequest)
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
               return;
             }
             TaskRunner taskRunner = taskRunnerOptional.get();
-            Set<String> allDirectories = new HashSet<>(storageConnector.listDir("/"));
+            Iterator<String> allFiles = storageConnector.listDir("");
             Set<String> runningTaskIds = taskRunner.getRunningTasks()
                                                    .stream()
                                                    .map(TaskRunnerWorkItem::getTaskId)
                                                    .map(DurableStorageUtils::getControllerDirectory)
                                                    .collect(Collectors.toSet());
-            Set<String> unknownDirectories = Sets.difference(allDirectories, runningTaskIds);
-            LOG.info(
-                "Following directories do not have a corresponding MSQ task associated with it:\n%s\nThese will get cleaned up.",
-                unknownDirectories
-            );
-            for (String unknownDirectory : unknownDirectories) {
-              storageConnector.deleteRecursively(unknownDirectory);
+
+            Set<String> filesToRemove = new HashSet<>();
+            while (allFiles.hasNext()) {
+              String currentFile = allFiles.next();
+              String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+              if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
+                if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {

Review Comment:
   nit: Can collapse this to something like
   ```suggestion
                   if (!runningTaskIds.contains(taskIdFromPathOrEmpty)) {
                        // filesToRemove.add(currentFile);
                   }
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
               return;
             }
             TaskRunner taskRunner = taskRunnerOptional.get();
-            Set<String> allDirectories = new HashSet<>(storageConnector.listDir("/"));
+            Iterator<String> allFiles = storageConnector.listDir("");
             Set<String> runningTaskIds = taskRunner.getRunningTasks()
                                                    .stream()
                                                    .map(TaskRunnerWorkItem::getTaskId)
                                                    .map(DurableStorageUtils::getControllerDirectory)
                                                    .collect(Collectors.toSet());
-            Set<String> unknownDirectories = Sets.difference(allDirectories, runningTaskIds);
-            LOG.info(
-                "Following directories do not have a corresponding MSQ task associated with it:\n%s\nThese will get cleaned up.",
-                unknownDirectories
-            );
-            for (String unknownDirectory : unknownDirectories) {
-              storageConnector.deleteRecursively(unknownDirectory);
+
+            Set<String> filesToRemove = new HashSet<>();
+            while (allFiles.hasNext()) {
+              String currentFile = allFiles.next();
+              String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+              if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
+                if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+                  // do nothing
+                } else {
+                  filesToRemove.add(currentFile);
+                }
+              }
+            }
+            if (filesToRemove.isEmpty()) {
+              LOG.info("Nothing to delete");

Review Comment:
   This log can be made more suggestive as to what nothing is.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -237,71 +266,95 @@ public OutputStream write(String path) throws IOException
   }
 
   @Override
-  public void deleteFile(String path)
+  public void deleteFile(String path) throws IOException
   {
-    s3Client.deleteObject(config.getBucket(), objectPath(path));
+    try {
+      S3Utils.retryS3Operation(() -> {
+        s3Client.deleteObject(config.getBucket(), objectPath(path));
+        return null;
+      }, config.getMaxRetry());
+    }
+    catch (Exception e) {
+      log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage());
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void deleteRecursively(String dirName)
+  public void deleteFiles(Iterable<String> paths) throws IOException
   {
-    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
-        .withBucketName(config.getBucket())
-        .withPrefix(objectPath(dirName));
-    ListObjectsV2Result objectListing = s3Client.listObjectsV2(listObjectsRequest);
-
-    while (objectListing.getObjectSummaries().size() > 0) {
-      List<DeleteObjectsRequest.KeyVersion> deleteObjectsRequestKeys = objectListing.getObjectSummaries()
-                                                                                    .stream()
-                                                                                    .map(S3ObjectSummary::getKey)
-                                                                                    .map(DeleteObjectsRequest.KeyVersion::new)
-                                                                                    .collect(Collectors.toList());
-      DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(config.getBucket()).withKeys(
-          deleteObjectsRequestKeys);
-      s3Client.deleteObjects(deleteObjectsRequest);
+    int currentItemSize = 0;
+    List<DeleteObjectsRequest.KeyVersion> versions = new ArrayList<>();
 
-      // If the listing is truncated, all S3 objects have been deleted, otherwise, fetch more using the continuation token
-      if (objectListing.isTruncated()) {
-        listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
-        objectListing = s3Client.listObjectsV2(listObjectsRequest);
-      } else {
-        break;
+    for (String path : paths) {
+      // appending base path to each path
+      versions.add(new DeleteObjectsRequest.KeyVersion(objectPath(path)));
+      currentItemSize++;
+      if (currentItemSize == MAX_NUMBER_OF_LISTINGS) {
+        deleteKeys(versions);
+        // resetting trackers
+        versions.clear();
+        currentItemSize = 0;
       }
     }
+    // deleting remaining elements
+    if (currentItemSize != 0) {
+      deleteKeys(versions);
+    }
   }
 
-  @Override
-  public List<String> listDir(String dirName)
+  private void deleteKeys(List<DeleteObjectsRequest.KeyVersion> versions) throws IOException

Review Comment:
   Should indicate that this is a retriable operation in the method signature. Also, similar comment about retrying being a top-level operation than the individual batch requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe merged pull request #13960: Reworking s3 connector with various improvements

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe merged PR #13960:
URL: https://github.com/apache/druid/pull/13960


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13960: Reworking s3 connector with various improvements

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1146285777


##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -121,4 +120,14 @@ public static String getOutputsFileNameForPath(
         path
     );
   }
+
+  /**
+   * Tries to parse out the controller taskID from the input path.
+   * <br></br>
+   * For eg:  for input path <b>controller_query_id/task/123</b> <br/>the function will return <b>controller_query_id</b>
+   */
+  public static String getControllerTaskIdWithPrefixFromPath(String path)
+  {
+    return path.split("/", 1)[0];

Review Comment:
   I added more java docs and test cases to this method. 
   We cannot throw an error for this since we need method to return whatever is found before "/" so that the durable storage cleaner can take further decisions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13960: Reworking s3 connector with various improvements

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1146289129


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -243,59 +243,73 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS
    * Delete the files from S3 in a specified bucket, matching a specified prefix and filter
    *
    * @param s3Client s3 client
-   * @param config   specifies the configuration to use when finding matching files in S3 to delete
+   * @param maxListingLength  maximum number of keys to fetch and delete at a time
    * @param bucket   s3 bucket
    * @param prefix   the file prefix
    * @param filter   function which returns true if the prefix file found should be deleted and false otherwise.
    *
-   * @throws Exception
+   * @throws Exception in case of errors
    */
+
   public static void deleteObjectsInPath(
       ServerSideEncryptingAmazonS3 s3Client,
-      S3InputDataConfig config,
+      int maxListingLength,
       String bucket,
       String prefix,
       Predicate<S3ObjectSummary> filter
   )
       throws Exception
   {
-    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
+    deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter, RetryUtils.DEFAULT_MAX_TRIES);
+  }
+
+  public static void deleteObjectsInPath(
+      ServerSideEncryptingAmazonS3 s3Client,
+      int maxListingLength,
+      String bucket,
+      String prefix,
+      Predicate<S3ObjectSummary> filter,
+      int maxRetries
+  )
+      throws Exception
+  {
+    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(maxListingLength);
     final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
         s3Client,
         ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
-        config.getMaxListingLength()
+        maxListingLength
     );
 
     while (iterator.hasNext()) {
       final S3ObjectSummary nextObject = iterator.next();
       if (filter.apply(nextObject)) {
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
-        if (keysToDelete.size() == config.getMaxListingLength()) {
-          deleteBucketKeys(s3Client, bucket, keysToDelete);
-          log.info("Deleted %d files", keysToDelete.size());
+        if (keysToDelete.size() == maxListingLength) {
+          deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);

Review Comment:
   I think each s3 remote call should be retried and not the abstractions we build on top of it. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13960: Reworking s3 connector with various improvements

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1146101160


##########
processing/src/main/java/org/apache/druid/storage/StorageConnector.java:
##########
@@ -105,26 +105,39 @@
    * with a basePath.
    * If the path is a directory, this method throws an exception.
    *
-   * @param path
-   * @throws IOException
+   * @param path to delete
+   * @throws IOException thrown in case of errors.
    */
   void deleteFile(String path) throws IOException;
 
+
+  /**
+   * Delete files present at the input paths. Most implementations prepend all the input paths
+   * with the basePath.
+   * <br/>
+   * This method is <b>recommended</b> in case we need to delete a batch of files.
+   * If the path is a directory, this method throws an exception.
+   *
+   * @param paths Iterable of the paths to delete.
+   * @throws IOException thrown in case of errors.
+   */
+  void deleteFiles(Iterable<String> paths) throws IOException;
+
   /**
    * Delete a directory pointed to by the path and also recursively deletes all files/directories in said directory.
    * Most implementations prepend the input path with a basePath.
    *
    * @param path path
-   * @throws IOException
+   * @throws IOException thrown in case of errors.
    */
   void deleteRecursively(String path) throws IOException;
 
   /**
-   * Returns a list containing all the files present in the path. The returned filenames should be such that joining
+   * Returns a lazy iterator containing all the files present in the path. The returned filenames should be such that joining
    * the dirName and the file name form the full path that can be used as the arguments for other methods of the storage
    * connector.
-   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the path
-   * "parent1/parent2" should be "child" and for "parent1" should be "parent2"
+   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the input path
+   * "parent1/parent2" should be "child" and for input "parent1" should be "parent2/child"
    */
-  List<String> listDir(String dirName);
+  Iterator<String> listDir(String dirName) throws IOException;

Review Comment:
   Since we do not want to materialize all the returned items in memory I used an iterator so that on hasNext we can fetch the next items if required. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cryptoe commented on a diff in pull request #13960: Reworking s3 connector with various improvements

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1146292211


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -237,71 +266,95 @@ public OutputStream write(String path) throws IOException
   }
 
   @Override
-  public void deleteFile(String path)
+  public void deleteFile(String path) throws IOException
   {
-    s3Client.deleteObject(config.getBucket(), objectPath(path));
+    try {
+      S3Utils.retryS3Operation(() -> {
+        s3Client.deleteObject(config.getBucket(), objectPath(path));
+        return null;
+      }, config.getMaxRetry());
+    }
+    catch (Exception e) {
+      log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage());
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void deleteRecursively(String dirName)
+  public void deleteFiles(Iterable<String> paths) throws IOException
   {
-    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
-        .withBucketName(config.getBucket())
-        .withPrefix(objectPath(dirName));
-    ListObjectsV2Result objectListing = s3Client.listObjectsV2(listObjectsRequest);
-
-    while (objectListing.getObjectSummaries().size() > 0) {
-      List<DeleteObjectsRequest.KeyVersion> deleteObjectsRequestKeys = objectListing.getObjectSummaries()
-                                                                                    .stream()
-                                                                                    .map(S3ObjectSummary::getKey)
-                                                                                    .map(DeleteObjectsRequest.KeyVersion::new)
-                                                                                    .collect(Collectors.toList());
-      DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(config.getBucket()).withKeys(
-          deleteObjectsRequestKeys);
-      s3Client.deleteObjects(deleteObjectsRequest);
+    int currentItemSize = 0;
+    List<DeleteObjectsRequest.KeyVersion> versions = new ArrayList<>();
 
-      // If the listing is truncated, all S3 objects have been deleted, otherwise, fetch more using the continuation token
-      if (objectListing.isTruncated()) {
-        listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
-        objectListing = s3Client.listObjectsV2(listObjectsRequest);
-      } else {
-        break;
+    for (String path : paths) {
+      // appending base path to each path
+      versions.add(new DeleteObjectsRequest.KeyVersion(objectPath(path)));
+      currentItemSize++;
+      if (currentItemSize == MAX_NUMBER_OF_LISTINGS) {
+        deleteKeys(versions);
+        // resetting trackers
+        versions.clear();
+        currentItemSize = 0;
       }
     }
+    // deleting remaining elements
+    if (currentItemSize != 0) {
+      deleteKeys(versions);
+    }
   }
 
-  @Override
-  public List<String> listDir(String dirName)
+  private void deleteKeys(List<DeleteObjectsRequest.KeyVersion> versions) throws IOException

Review Comment:
   I prefer to keep the names free from retry since that is more of an implementation detail. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org