You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "rohangarg (via GitHub)" <gi...@apache.org> on 2023/05/09 15:30:39 UTC

[GitHub] [druid] rohangarg commented on a diff in pull request #14131: Speed up kill tasks by deleting segments in batch

rohangarg commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1188780058


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +73,69 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), BUCKET);
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
+            .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
+            .flatMap(path -> Stream.of(new DeleteObjectsRequest.KeyVersion(path),
+                                     new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
+            .collect(Collectors.toList());
+
+    // max delete object request size is 1000 for S3
+    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(keysToDelete, 1000);
+    DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+    // only return objects failed to delete.
+    deleteObjectsRequest.setQuiet(true);
+
+    List<String> keysNotDeleted = new ArrayList<>();
+    for (List<DeleteObjectsRequest.KeyVersion> keysChunk : keysChunks) {
+      List<String> keysToDeleteStrings = keysChunk.stream().map(
+            DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+      try {
+        deleteObjectsRequest.setKeys(keysChunk);
+        log.info("Removing from bucket: [%s] the following index files: [%s] from s3!", s3Bucket, keysToDeleteStrings);
+        s3Client.deleteObjects(deleteObjectsRequest);
+      }
+      catch (MultiObjectDeleteException e)
+      {
+        keysNotDeleted.addAll(e.getErrors().stream()
+         .map(MultiObjectDeleteException.DeleteError::getKey)
+         .collect(Collectors.toList()));
+      }
+      catch (AmazonServiceException e)
+      {
+        throw new SegmentLoadingException(e,

Review Comment:
   might be better to add the message as something like `Couldn't completely kill segments : [%s] due to [%s]` like it is done in single `kill`



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +73,69 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), BUCKET);
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
+            .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
+            .flatMap(path -> Stream.of(new DeleteObjectsRequest.KeyVersion(path),
+                                     new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
+            .collect(Collectors.toList());
+
+    // max delete object request size is 1000 for S3
+    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(keysToDelete, 1000);
+    DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+    // only return objects failed to delete.
+    deleteObjectsRequest.setQuiet(true);
+
+    List<String> keysNotDeleted = new ArrayList<>();
+    for (List<DeleteObjectsRequest.KeyVersion> keysChunk : keysChunks) {
+      List<String> keysToDeleteStrings = keysChunk.stream().map(
+            DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+      try {
+        deleteObjectsRequest.setKeys(keysChunk);
+        log.info("Removing from bucket: [%s] the following index files: [%s] from s3!", s3Bucket, keysToDeleteStrings);
+        s3Client.deleteObjects(deleteObjectsRequest);

Review Comment:
   can we re-use `S3Utils.deleteBucketKeys` here? 



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +73,69 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), BUCKET);
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
+            .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
+            .flatMap(path -> Stream.of(new DeleteObjectsRequest.KeyVersion(path),
+                                     new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
+            .collect(Collectors.toList());
+
+    // max delete object request size is 1000 for S3
+    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(keysToDelete, 1000);
+    DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+    // only return objects failed to delete.
+    deleteObjectsRequest.setQuiet(true);
+
+    List<String> keysNotDeleted = new ArrayList<>();
+    for (List<DeleteObjectsRequest.KeyVersion> keysChunk : keysChunks) {
+      List<String> keysToDeleteStrings = keysChunk.stream().map(
+            DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+      try {
+        deleteObjectsRequest.setKeys(keysChunk);
+        log.info("Removing from bucket: [%s] the following index files: [%s] from s3!", s3Bucket, keysToDeleteStrings);
+        s3Client.deleteObjects(deleteObjectsRequest);
+      }
+      catch (MultiObjectDeleteException e)
+      {
+        keysNotDeleted.addAll(e.getErrors().stream()
+         .map(MultiObjectDeleteException.DeleteError::getKey)
+         .collect(Collectors.toList()));
+      }
+      catch (AmazonServiceException e)
+      {
+        throw new SegmentLoadingException(e,
+                                          "Unable to delete from bucket [%s]",
+                                          s3Bucket);
+      }
+    }
+    if (!keysNotDeleted.isEmpty()) {
+      throw new SegmentLoadingException("Couldn't delete from bucket: [%s] these files [%s]", s3Bucket, keysNotDeleted);

Review Comment:
   similar message comment here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org