You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/28 11:35:28 UTC

[druid] branch master updated: Reworking s3 connector with various improvements (#13960)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c2fe6a4956 Reworking s3 connector with various improvements (#13960)
c2fe6a4956 is described below

commit c2fe6a4956de416ce7e91fa82847dd4219ffd50a
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Tue Mar 28 17:05:16 2023 +0530

    Reworking s3 connector with various improvements (#13960)
    
    * Reworking s3 connector with
    1. Adding retries
    2. Adding max fetch size
    3. Using s3Utils for most of the api's
    4. Fixing bugs in DurableStorageCleaner
    5. Moving to Iterator for listDir call
---
 .../druid/msq/indexing/DurableStorageCleaner.java  |  40 +++--
 .../msq/indexing/DurableStorageCleanerTest.java    |  65 ++++----
 .../druid/storage/s3/S3DataSegmentKiller.java      |   2 +-
 .../org/apache/druid/storage/s3/S3TaskLogs.java    |   2 +-
 .../java/org/apache/druid/storage/s3/S3Utils.java  |  44 +++--
 .../storage/s3/output/S3StorageConnector.java      | 177 ++++++++++++++-------
 .../storage/s3/output/S3StorageConnectorTest.java  |  49 +++++-
 .../druid/frame/util/DurableStorageUtils.java      |  33 +++-
 .../org/apache/druid/storage/StorageConnector.java |  29 +++-
 .../storage/local/LocalFileStorageConnector.java   |  37 +++--
 .../druid/frame/util/DurableStorageUtilsTest.java  |  38 +++++
 .../local/LocalFileStorageConnectorTest.java       |  30 ++--
 12 files changed, 396 insertions(+), 150 deletions(-)

diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
index b89af76112..4d7a076020 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.indexing;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import org.apache.druid.frame.util.DurableStorageUtils;
@@ -37,6 +36,7 @@ import org.joda.time.Duration;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
@@ -93,19 +93,41 @@ public class DurableStorageCleaner implements OverlordHelper
               return;
             }
             TaskRunner taskRunner = taskRunnerOptional.get();
-            Set<String> allDirectories = new HashSet<>(storageConnector.listDir("/"));
+            Iterator<String> allFiles = storageConnector.listDir("");
             Set<String> runningTaskIds = taskRunner.getRunningTasks()
                                                    .stream()
                                                    .map(TaskRunnerWorkItem::getTaskId)
                                                    .map(DurableStorageUtils::getControllerDirectory)
                                                    .collect(Collectors.toSet());
-            Set<String> unknownDirectories = Sets.difference(allDirectories, runningTaskIds);
-            LOG.info(
-                "Following directories do not have a corresponding MSQ task associated with it:\n%s\nThese will get cleaned up.",
-                unknownDirectories
-            );
-            for (String unknownDirectory : unknownDirectories) {
-              storageConnector.deleteRecursively(unknownDirectory);
+
+            Set<String> filesToRemove = new HashSet<>();
+            while (allFiles.hasNext()) {
+              String currentFile = allFiles.next();
+              String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+              if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
+                if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+                  // do nothing
+                } else {
+                  filesToRemove.add(currentFile);
+                }
+              }
+            }
+            if (filesToRemove.isEmpty()) {
+              LOG.info("DurableStorageCleaner did not find any left over directories to delete");
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Number of files [%d] that do not have a corresponding MSQ task associated with it. These are:\n[%s]\nT",
+                    filesToRemove.size(),
+                    filesToRemove
+                );
+              } else {
+                LOG.info(
+                    "Number of files [%d] that do not have a corresponding MSQ task associated with it.",
+                    filesToRemove.size()
+                );
+              }
+              storageConnector.deleteFiles(filesToRemove);
             }
           }
           catch (IOException e) {
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
index 664bd8f9a2..e466acb83f 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java
@@ -21,8 +21,8 @@ package org.apache.druid.msq.indexing;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.apache.druid.frame.util.DurableStorageUtils;
-import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
@@ -34,7 +34,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 
 public class DurableStorageCleanerTest
@@ -44,39 +46,44 @@ public class DurableStorageCleanerTest
   private static final TaskRunner TASK_RUNNER = EasyMock.mock(TaskRunner.class);
   private static final StorageConnector STORAGE_CONNECTOR = EasyMock.mock(StorageConnector.class);
   private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM = EasyMock.mock(TaskRunnerWorkItem.class);
-  private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
   private static final String TASK_ID = "dummyTaskId";
   private static final String STRAY_DIR = "strayDirectory";
 
   @Test
   public void testSchedule() throws IOException, InterruptedException
   {
-    EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
-    DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
-    durableStorageCleanerConfig.delaySeconds = 1L;
-    durableStorageCleanerConfig.enabled = true;
-    DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
-        durableStorageCleanerConfig,
-        STORAGE_CONNECTOR,
-        () -> TASK_MASTER
-    );
-    EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
-            .andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), "strayDirectory"))
-            .anyTimes();
-    EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
-            .anyTimes();
-    EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
-            .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
-            .anyTimes();
-    EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
-    Capture<String> capturedArguments = EasyMock.newCapture();
-    STORAGE_CONNECTOR.deleteRecursively(EasyMock.capture(capturedArguments));
-    EasyMock.expectLastCall().anyTimes();
-    EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
-
-
-    durableStorageCleaner.schedule(Executors.newSingleThreadScheduledExecutor());
-    Thread.sleep(8000L);
-    Assert.assertEquals(STRAY_DIR, capturedArguments.getValue());
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    try {
+      EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
+      DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
+      durableStorageCleanerConfig.delaySeconds = 1L;
+      durableStorageCleanerConfig.enabled = true;
+      DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
+          durableStorageCleanerConfig,
+          STORAGE_CONNECTOR,
+          () -> TASK_MASTER
+      );
+      EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
+              .andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR)
+                                      .stream()
+                                      .iterator())
+              .anyTimes();
+      EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
+              .anyTimes();
+      EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
+              .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
+              .anyTimes();
+      EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
+      Capture<Set<String>> capturedArguments = EasyMock.newCapture();
+      STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
+      EasyMock.expectLastCall().once();
+      EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
+      durableStorageCleaner.schedule(executor);
+      Thread.sleep(8000L);
+      Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue());
+    }
+    finally {
+      executor.shutdownNow();
+    }
   }
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
index df2f7439ba..d06583630d 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
@@ -103,7 +103,7 @@ public class S3DataSegmentKiller implements DataSegmentKiller
     try {
       S3Utils.deleteObjectsInPath(
           s3ClientSupplier.get(),
-          inputDataConfig,
+          inputDataConfig.getMaxListingLength(),
           segmentPusherConfig.getBucket(),
           segmentPusherConfig.getBaseKey(),
           Predicates.alwaysTrue()
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index a28fd1651c..27544e1eec 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -187,7 +187,7 @@ public class S3TaskLogs implements TaskLogs
     try {
       S3Utils.deleteObjectsInPath(
           service,
-          inputDataConfig,
+          inputDataConfig.getMaxListingLength(),
           config.getS3Bucket(),
           config.getS3Prefix(),
           (object) -> object.getLastModified().getTime() < timestamp
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index a017698497..21d06aad8f 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -93,7 +93,7 @@ public class S3Utils
    * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
    * found, etc) are not retried.
    */
-  static <T> T retryS3Operation(Task<T> f) throws Exception
+  public static <T> T retryS3Operation(Task<T> f) throws Exception
   {
     return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
   }
@@ -102,7 +102,7 @@ public class S3Utils
    * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
    * found, etc) are not retried. Also provide a way to set maxRetries that can be useful, i.e. for testing.
    */
-  static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
+  public static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
   {
     return RetryUtils.retry(f, S3RETRY, maxRetries);
   }
@@ -243,51 +243,64 @@ public class S3Utils
    * Delete the files from S3 in a specified bucket, matching a specified prefix and filter
    *
    * @param s3Client s3 client
-   * @param config   specifies the configuration to use when finding matching files in S3 to delete
+   * @param maxListingLength  maximum number of keys to fetch and delete at a time
    * @param bucket   s3 bucket
    * @param prefix   the file prefix
    * @param filter   function which returns true if the prefix file found should be deleted and false otherwise.
    *
-   * @throws Exception
+   * @throws Exception in case of errors
    */
+
   public static void deleteObjectsInPath(
       ServerSideEncryptingAmazonS3 s3Client,
-      S3InputDataConfig config,
+      int maxListingLength,
       String bucket,
       String prefix,
       Predicate<S3ObjectSummary> filter
   )
       throws Exception
   {
-    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
+    deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter, RetryUtils.DEFAULT_MAX_TRIES);
+  }
+
+  public static void deleteObjectsInPath(
+      ServerSideEncryptingAmazonS3 s3Client,
+      int maxListingLength,
+      String bucket,
+      String prefix,
+      Predicate<S3ObjectSummary> filter,
+      int maxRetries
+  )
+      throws Exception
+  {
+    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(maxListingLength);
     final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
         s3Client,
         ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
-        config.getMaxListingLength()
+        maxListingLength
     );
 
     while (iterator.hasNext()) {
       final S3ObjectSummary nextObject = iterator.next();
       if (filter.apply(nextObject)) {
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
-        if (keysToDelete.size() == config.getMaxListingLength()) {
-          deleteBucketKeys(s3Client, bucket, keysToDelete);
-          log.info("Deleted %d files", keysToDelete.size());
+        if (keysToDelete.size() == maxListingLength) {
+          deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);
           keysToDelete.clear();
         }
       }
     }
 
     if (keysToDelete.size() > 0) {
-      deleteBucketKeys(s3Client, bucket, keysToDelete);
-      log.info("Deleted %d files", keysToDelete.size());
+      deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);
     }
   }
 
-  private static void deleteBucketKeys(
+  public static void deleteBucketKeys(
       ServerSideEncryptingAmazonS3 s3Client,
       String bucket,
-      List<DeleteObjectsRequest.KeyVersion> keysToDelete
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+      int retries
   )
       throws Exception
   {
@@ -295,7 +308,8 @@ public class S3Utils
     S3Utils.retryS3Operation(() -> {
       s3Client.deleteObjects(deleteRequest);
       return null;
-    });
+    }, retries);
+    log.info("Deleted %d files", keysToDelete.size());
   }
 
   /**
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 66391ead4d..c72f954401 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -21,12 +21,13 @@ package org.apache.druid.storage.s3.output;
 
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.data.input.impl.RetryingInputStream;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
 import org.apache.druid.java.util.common.FileUtils;
@@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -48,14 +50,18 @@ import java.io.OutputStream;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
+/**
+ * In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times.
+ */
 public class S3StorageConnector implements StorageConnector
 {
+  private static final Logger log = new Logger(S3StorageConnector.class);
 
   private final S3OutputConfig config;
   private final ServerSideEncryptingAmazonS3 s3Client;
@@ -63,6 +69,7 @@ public class S3StorageConnector implements StorageConnector
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
   private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
+  private static final int MAX_NUMBER_OF_LISTINGS = 1000;
 
   public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
   {
@@ -82,9 +89,18 @@ public class S3StorageConnector implements StorageConnector
   }
 
   @Override
-  public boolean pathExists(String path)
+  public boolean pathExists(String path) throws IOException
   {
-    return s3Client.doesObjectExist(config.getBucket(), objectPath(path));
+    try {
+      return S3Utils.retryS3Operation(
+          () -> s3Client.doesObjectExist(config.getBucket(), objectPath(path)),
+          config.getMaxRetry()
+      );
+    }
+    catch (Exception e) {
+      log.error("Error occurred while checking if file [%s] exists. Error: [%s]", path, e.getMessage());
+      throw new IOException(e);
+    }
   }
 
   @Override
@@ -119,7 +135,15 @@ public class S3StorageConnector implements StorageConnector
       currReadStart.set(getObjectRequest.getRange()[0]);
       readEnd = getObjectRequest.getRange()[1] + 1;
     } else {
-      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
+      try {
+        readEnd = S3Utils.retryS3Operation(
+            () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(),
+            config.getMaxRetry()
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
     AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
 
@@ -166,7 +190,15 @@ public class S3StorageConnector implements StorageConnector
                     @Override
                     public InputStream open(GetObjectRequest object)
                     {
-                      return s3Client.getObject(object).getObjectContent();
+                      try {
+                        return S3Utils.retryS3Operation(
+                            () -> s3Client.getObject(object).getObjectContent(),
+                            config.getMaxRetry()
+                        );
+                      }
+                      catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
                     }
 
                     @Override
@@ -182,7 +214,7 @@ public class S3StorageConnector implements StorageConnector
                     }
                   },
                   S3Utils.S3RETRY,
-                  3
+                  config.getMaxRetry()
               ),
               outFile,
               new byte[8 * 1024],
@@ -237,71 +269,102 @@ public class S3StorageConnector implements StorageConnector
   }
 
   @Override
-  public void deleteFile(String path)
+  public void deleteFile(String path) throws IOException
   {
-    s3Client.deleteObject(config.getBucket(), objectPath(path));
+    try {
+      S3Utils.retryS3Operation(() -> {
+        s3Client.deleteObject(config.getBucket(), objectPath(path));
+        return null;
+      }, config.getMaxRetry());
+    }
+    catch (Exception e) {
+      log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage());
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void deleteRecursively(String dirName)
+  public void deleteFiles(Iterable<String> paths) throws IOException
   {
-    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
-        .withBucketName(config.getBucket())
-        .withPrefix(objectPath(dirName));
-    ListObjectsV2Result objectListing = s3Client.listObjectsV2(listObjectsRequest);
-
-    while (objectListing.getObjectSummaries().size() > 0) {
-      List<DeleteObjectsRequest.KeyVersion> deleteObjectsRequestKeys = objectListing.getObjectSummaries()
-                                                                                    .stream()
-                                                                                    .map(S3ObjectSummary::getKey)
-                                                                                    .map(DeleteObjectsRequest.KeyVersion::new)
-                                                                                    .collect(Collectors.toList());
-      DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(config.getBucket()).withKeys(
-          deleteObjectsRequestKeys);
-      s3Client.deleteObjects(deleteObjectsRequest);
+    int currentItemSize = 0;
+    List<DeleteObjectsRequest.KeyVersion> versions = new ArrayList<>();
 
-      // If the listing is truncated, all S3 objects have been deleted, otherwise, fetch more using the continuation token
-      if (objectListing.isTruncated()) {
-        listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
-        objectListing = s3Client.listObjectsV2(listObjectsRequest);
-      } else {
-        break;
+    for (String path : paths) {
+      // appending base path to each path
+      versions.add(new DeleteObjectsRequest.KeyVersion(objectPath(path)));
+      currentItemSize++;
+      if (currentItemSize == MAX_NUMBER_OF_LISTINGS) {
+        deleteKeys(versions);
+        // resetting trackers
+        versions.clear();
+        currentItemSize = 0;
       }
     }
+    // deleting remaining elements
+    if (currentItemSize != 0) {
+      deleteKeys(versions);
+    }
   }
 
-  @Override
-  public List<String> listDir(String dirName)
+  private void deleteKeys(List<DeleteObjectsRequest.KeyVersion> versions) throws IOException
   {
-    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
-        .withBucketName(config.getBucket())
-        .withPrefix(objectPath(dirName))
-        .withDelimiter(DELIM);
+    try {
+      S3Utils.deleteBucketKeys(s3Client, config.getBucket(), versions, config.getMaxRetry());
+    }
+    catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 
-    List<String> lsResult = new ArrayList<>();
 
-    ListObjectsV2Result objectListing = s3Client.listObjectsV2(listObjectsRequest);
+  @Override
+  public void deleteRecursively(String dirName) throws IOException
+  {
+    try {
+      S3Utils.deleteObjectsInPath(
+          s3Client,
+          MAX_NUMBER_OF_LISTINGS,
+          config.getBucket(),
+          objectPath(dirName),
+          Predicates.alwaysTrue(),
+          config.getMaxRetry()
+      );
+    }
+    catch (Exception e) {
+      log.error("Error occurred while deleting files in path [%s]. Error: [%s]", dirName, e.getMessage());
+      throw new IOException(e);
+    }
+  }
 
-    while (objectListing.getObjectSummaries().size() > 0) {
-      objectListing.getObjectSummaries()
-                   .stream().map(S3ObjectSummary::getKey)
-                   .map(
-                       key -> {
-                         int index = key.lastIndexOf(DELIM);
-                         return key.substring(index + 1);
-                       }
-                   )
-                   .filter(keyPart -> !keyPart.isEmpty())
-                   .forEach(lsResult::add);
+  @Override
+  public Iterator<String> listDir(String dirName) throws IOException
+  {
+    final String prefixBasePath = objectPath(dirName);
+    try {
 
-      if (objectListing.isTruncated()) {
-        listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
-        objectListing = s3Client.listObjectsV2(listObjectsRequest);
-      } else {
-        break;
-      }
+      Iterator<S3ObjectSummary> files = S3Utils.objectSummaryIterator(
+          s3Client,
+          ImmutableList.of(new CloudObjectLocation(
+              config.getBucket(),
+              prefixBasePath
+          ).toUri("s3")),
+          MAX_NUMBER_OF_LISTINGS,
+          config.getMaxRetry()
+      );
+
+      return Iterators.transform(files, summary -> {
+        String[] size = summary.getKey().split(prefixBasePath, 2);
+        if (size.length > 1) {
+          return size[1];
+        } else {
+          return "";
+        }
+      });
+    }
+    catch (Exception e) {
+      log.error("Error occoured while listing files at path [%s]. Error: [%s]", dirName, e.getMessage());
+      throw new IOException(e);
     }
-    return lsResult;
   }
 
   @Nonnull
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
index 0a02dce4d2..6deda73e8e 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
@@ -28,6 +28,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.s3.NoopServerSideEncryption;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -176,7 +177,7 @@ public class S3StorageConnectorTest
   }
 
   @Test
-  public void pathDelete() throws IOException
+  public void testDeleteSinglePath() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
     S3_CLIENT.deleteObject(BUCKET, PREFIX + "/" + TEST_FILE);
@@ -185,20 +186,42 @@ public class S3StorageConnectorTest
     EasyMock.reset(S3_CLIENT);
   }
 
+
+  @Test
+  public void testDeleteMultiplePaths() throws IOException
+  {
+    EasyMock.reset(S3_CLIENT);
+    String testFile2 = "file2";
+    DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(BUCKET);
+    deleteObjectsRequest.withKeys(PREFIX + "/" + TEST_FILE, PREFIX + "/" + testFile2);
+    Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture();
+
+    EasyMock.expect(S3_CLIENT.deleteObjects(EasyMock.capture(capturedArgument))).andReturn(null).once();
+    EasyMock.replay(S3_CLIENT);
+    storageConnector.deleteFiles(Lists.newArrayList(TEST_FILE, testFile2));
+
+    Assert.assertEquals(convertDeleteObjectsRequestToString(deleteObjectsRequest), convertDeleteObjectsRequestToString(capturedArgument.getValue()));
+    EasyMock.reset(S3_CLIENT);
+  }
+
+
   @Test
-  public void pathDeleteRecursively() throws IOException
+  public void testPathDeleteRecursively() throws IOException
   {
     EasyMock.reset(S3_CLIENT, TEST_RESULT);
 
     S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
     s3ObjectSummary.setBucketName(BUCKET);
     s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
-
-    EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
-    EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
+    s3ObjectSummary.setSize(1);
     EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
             .andReturn(TEST_RESULT);
 
+    EasyMock.expect(TEST_RESULT.getBucketName()).andReturn("123").anyTimes();
+    EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).anyTimes();
+    EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false).times(1);
+    EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null);
+
     Capture<DeleteObjectsRequest> capturedArgument = EasyMock.newCapture();
     EasyMock.expect(S3_CLIENT.deleteObjects(EasyMock.and(
         EasyMock.capture(capturedArgument),
@@ -214,21 +237,33 @@ public class S3StorageConnectorTest
   }
 
   @Test
-  public void testListDir()
+  public void testListDir() throws IOException
   {
     EasyMock.reset(S3_CLIENT, TEST_RESULT);
 
     S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
     s3ObjectSummary.setBucketName(BUCKET);
     s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
+    s3ObjectSummary.setSize(1);
 
     EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
     EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
+    EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null);
     EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) EasyMock.anyObject()))
             .andReturn(TEST_RESULT);
     EasyMock.replay(S3_CLIENT, TEST_RESULT);
 
-    List<String> listDirResult = storageConnector.listDir("/");
+    List<String> listDirResult = Lists.newArrayList(storageConnector.listDir("test/"));
     Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
   }
+
+  private String convertDeleteObjectsRequestToString(DeleteObjectsRequest deleteObjectsRequest)
+  {
+    return deleteObjectsRequest.getKeys()
+                               .stream()
+                               .map(keyVersion -> keyVersion.getKey() + keyVersion.getVersion())
+                               .collect(
+                                   Collectors.joining());
+  }
+
 }
diff --git a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
index 6e6bbd9fe9..8aea264bc3 100644
--- a/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
+++ b/processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java
@@ -19,15 +19,20 @@
 
 package org.apache.druid.frame.util;
 
+import com.google.common.base.Splitter;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.StringUtils;
 
+import javax.annotation.Nullable;
+import java.util.Iterator;
+
 /**
  * Helper class that fetches the directory and file names corresponding to file location
  */
 public class DurableStorageUtils
 {
   public static final String SUCCESS_MARKER_FILENAME = "__success";
+  public static final Splitter SPLITTER = Splitter.on("/").limit(2);
 
   public static String getControllerDirectory(final String controllerTaskId)
   {
@@ -45,8 +50,7 @@ public class DurableStorageUtils
         stageNumber,
         workerNumber
     );
-    String fileName = StringUtils.format("%s/%s", folderName, SUCCESS_MARKER_FILENAME);
-    return fileName;
+    return StringUtils.format("%s/%s", folderName, SUCCESS_MARKER_FILENAME);
   }
 
   /**
@@ -121,4 +125,29 @@ public class DurableStorageUtils
         path
     );
   }
+
+  /**
+   * Tries to parse out the controller taskID from the input path.
+   * <br></br>
+   * For eg:
+   * <br/>
+   * <ul>
+   *   <li>for input path <b>controller_query_id/task/123</b> the function will return <b>controller_query_id</b></li>
+   *   <li>for input path <b>abcd</b>, the function will return <b>abcd</b></li>
+   *   <li>for input path <b>null</b>, the function will return <b>null</b></li>
+   * </ul>
+   */
+  @Nullable
+  public static String getControllerTaskIdWithPrefixFromPath(String path)
+  {
+    if (path == null) {
+      return null;
+    }
+    Iterator<String> elements = SPLITTER.split(path).iterator();
+    if (elements.hasNext()) {
+      return elements.next();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
index e643882e49..3d1ead89b1 100644
--- a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
+++ b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
@@ -24,7 +24,7 @@ import org.apache.druid.guice.annotations.UnstableApi;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
+import java.util.Iterator;
 
 /**
  * Low level interface for interacting with different storage providers like S3, GCS, Azure and local file system.
@@ -105,26 +105,39 @@ public interface StorageConnector
    * with a basePath.
    * If the path is a directory, this method throws an exception.
    *
-   * @param path
-   * @throws IOException
+   * @param path to delete
+   * @throws IOException thrown in case of errors.
    */
   void deleteFile(String path) throws IOException;
 
+
+  /**
+   * Delete files present at the input paths. Most implementations prepend all the input paths
+   * with the basePath.
+   * <br/>
+   * This method is <b>recommended</b> in case we need to delete a batch of files.
+   * If the path is a directory, this method throws an exception.
+   *
+   * @param paths Iterable of the paths to delete.
+   * @throws IOException thrown in case of errors.
+   */
+  void deleteFiles(Iterable<String> paths) throws IOException;
+
   /**
    * Delete a directory pointed to by the path and also recursively deletes all files/directories in said directory.
    * Most implementations prepend the input path with a basePath.
    *
    * @param path path
-   * @throws IOException
+   * @throws IOException thrown in case of errors.
    */
   void deleteRecursively(String path) throws IOException;
 
   /**
-   * Returns a list containing all the files present in the path. The returned filenames should be such that joining
+   * Returns a lazy iterator containing all the files present in the path. The returned filenames should be such that joining
    * the dirName and the file name form the full path that can be used as the arguments for other methods of the storage
    * connector.
-   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the path
-   * "parent1/parent2" should be "child" and for "parent1" should be "parent2"
+   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the filename returned for the input path
+   * "parent1/parent2" should be "child" and for input "parent1" should be "parent2/child"
    */
-  List<String> listDir(String dirName);
+  Iterator<String> listDir(String dirName) throws IOException;
 }
diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
index 366167dcdc..737056554e 100644
--- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
+++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
@@ -36,8 +36,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Iterator;
 
 /**
  * Implementation that uses local filesystem. All paths are appended with the base path, in such a way that it is not visible
@@ -91,9 +90,9 @@ public class LocalFileStorageConnector implements StorageConnector
    * In case the parent directory does not exist, we create the parent dir recursively.
    * Closing of the stream is the responsibility of the caller.
    *
-   * @param path
-   * @return
-   * @throws IOException
+   * @param path path to write contents to.
+   * @return OutputStream which can be used by callers to write contents.
+   * @throws IOException thrown in case of errors.
    */
   @Override
   public OutputStream write(String path) throws IOException
@@ -104,10 +103,10 @@ public class LocalFileStorageConnector implements StorageConnector
   }
 
   /**
-   * Deletes the file present at the location basePath + path. Throws an excecption in case a dir is encountered.
+   * Deletes the file present at the location basePath + path. Throws an exception in case a dir is encountered.
    *
-   * @param path
-   * @throws IOException
+   * @param path input path
+   * @throws IOException thrown in case of errors.
    */
   @Override
   public void deleteFile(String path) throws IOException
@@ -115,16 +114,30 @@ public class LocalFileStorageConnector implements StorageConnector
     File toDelete = fileWithBasePath(path);
     if (toDelete.isDirectory()) {
       throw new IAE(StringUtils.format(
-          "Found a directory on path[%s]. Please use deleteRecusively to delete dirs", path));
+          "Found a directory on path[%s]. Please use deleteRecursively to delete dirs", path));
     }
     Files.delete(fileWithBasePath(path).toPath());
   }
 
+  /**
+   * Deletes the files present at each basePath + path. Throws an exception in case a dir is encountered.
+   *
+   * @param paths list of path to delete
+   * @throws IOException thrown in case of errors.
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) throws IOException
+  {
+    for (String path : paths) {
+      deleteFile(path);
+    }
+  }
+
   /**
    * Deletes the files and sub dirs present at the basePath + dirName. Also removes the dirName
    *
    * @param dirName path
-   * @throws IOException
+   * @throws IOException thrown in case of errors.
    */
   @Override
   public void deleteRecursively(String dirName) throws IOException
@@ -133,7 +146,7 @@ public class LocalFileStorageConnector implements StorageConnector
   }
 
   @Override
-  public List<String> listDir(String dirName)
+  public Iterator<String> listDir(String dirName)
   {
     File directory = fileWithBasePath(dirName);
     if (!directory.exists()) {
@@ -146,7 +159,7 @@ public class LocalFileStorageConnector implements StorageConnector
     if (files == null) {
       throw new ISE("Unable to fetch the file list from the path [%s]", dirName);
     }
-    return Arrays.stream(files).map(File::getName).collect(Collectors.toList());
+    return Arrays.stream(files).map(File::getName).iterator();
   }
 
   public File getBasePath()
diff --git a/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
new file mode 100644
index 0000000000..8e32038107
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/frame/util/DurableStorageUtilsTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DurableStorageUtilsTest
+{
+
+  @Test
+  public void getControllerTaskIdWithPrefixFromPath()
+  {
+    Assert.assertEquals("", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("/123/123"));
+    Assert.assertEquals("123", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("123"));
+    Assert.assertEquals("controller_query_123",
+                        DurableStorageUtils.getControllerTaskIdWithPrefixFromPath("controller_query_123/123"));
+    Assert.assertEquals("", DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(""));
+    Assert.assertNull(DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(null));
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java
index d1eaf3a73b..4bc8886f99 100644
--- a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java
+++ b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.storage.local;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.StorageConnectorProvider;
@@ -36,7 +38,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -103,7 +104,22 @@ public class LocalFileStorageConnectorTest
     storageConnector.deleteRecursively(uuid_base);
     Assert.assertFalse(baseFile.exists());
     Assert.assertTrue(new File(tempDir.getAbsolutePath(), topLevelDir).exists());
+  }
 
+  @Test
+  public void batchDelete() throws IOException
+  {
+    String uuid1 = UUID.randomUUID().toString();
+    String uuid2 = UUID.randomUUID().toString();
+
+    //create file
+    createAndPopulateFile(uuid1);
+    createAndPopulateFile(uuid2);
+
+    // delete file
+    storageConnector.deleteFiles(ImmutableList.of(uuid1, uuid2));
+    Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid1).exists());
+    Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid2).exists());
   }
 
   @Test
@@ -126,29 +142,25 @@ public class LocalFileStorageConnectorTest
     createAndPopulateFile(uuid1);
     createAndPopulateFile(uuid2);
 
-    List<String> topLevelDirContents = storageConnector.listDir(topLevelDir);
+    List<String> topLevelDirContents = Lists.newArrayList(storageConnector.listDir(topLevelDir));
     List<String> expectedTopLevelDirContents = ImmutableList.of(new File(uuid_base).getName());
     Assert.assertEquals(expectedTopLevelDirContents, topLevelDirContents);
 
     // Converted to a set since the output of the listDir can be shuffled
-    Set<String> nextLevelDirContents = new HashSet<>(storageConnector.listDir(uuid_base));
+    Set<String> nextLevelDirContents = Sets.newHashSet(storageConnector.listDir(uuid_base));
     Set<String> expectedNextLevelDirContents = ImmutableSet.of(new File(uuid1).getName(), new File(uuid2).getName());
     Assert.assertEquals(expectedNextLevelDirContents, nextLevelDirContents);
 
     // Check if listDir throws if an unknown path is passed as an argument
     Assert.assertThrows(
         IAE.class,
-        () -> {
-          storageConnector.listDir("unknown_top_path");
-        }
+        () -> storageConnector.listDir("unknown_top_path")
     );
 
     // Check if listDir throws if a file path is passed as an argument
     Assert.assertThrows(
         IAE.class,
-        () -> {
-          storageConnector.listDir(uuid1);
-        }
+        () -> storageConnector.listDir(uuid1)
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org