You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2020/02/06 02:21:07 UTC

[lucene-solr] branch jira/SOLR-13101 updated: SOLR-14044: Support collection and shard deletion in shared storage (#1188)

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/SOLR-13101 by this push:
     new 3e8ca67  SOLR-14044: Support collection and shard deletion in shared storage (#1188)
3e8ca67 is described below

commit 3e8ca67f6f5d87b103148824e2bf158d5d5330da
Author: Andy Vuong <an...@users.noreply.github.com>
AuthorDate: Wed Feb 5 18:20:56 2020 -0800

    SOLR-14044: Support collection and shard deletion in shared storage (#1188)
    
    * Support collection and shard deletion in shared storage
    
    * Add end to end collection api delete tests and fix local client test
    
    * Fix timestamps
    
    * Remove debug log line and fix timestamps
    
    * Address review comments and fix test
    
    * Close resource and throw exception on failure
---
 .../cloud/api/collections/DeleteCollectionCmd.java |  45 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |  36 +-
 .../solr/store/blob/client/CoreStorageClient.java  |   5 +
 .../solr/store/blob/client/LocalStorageClient.java |  25 ++
 .../solr/store/blob/client/S3StorageClient.java    |  22 +
 .../solr/store/blob/metadata/CorePushPull.java     |  16 +-
 .../solr/store/blob/process/BlobDeleteManager.java | 193 +++++----
 .../store/blob/process/BlobDeleteProcessor.java    | 176 ++++++++
 .../solr/store/blob/process/BlobDeleterTask.java   | 276 +++++++++---
 .../solr/store/shared/SharedStoreManager.java      |  11 +-
 .../store/blob/client/CoreStorageClientTest.java   |  19 +
 .../blob/process/BlobDeleteProcessorTest.java      | 464 +++++++++++++++++++++
 .../process/SharedStoreDeletionProcessTest.java    | 329 +++++++++++++++
 .../store/shared/SolrCloudSharedStoreTestCase.java |  11 +-
 14 files changed, 1465 insertions(+), 163 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 648f5ba..4eee49b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -18,6 +18,13 @@
 
 package org.apache.solr.cloud.api.collections;
 
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -25,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -47,17 +55,14 @@ import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.handler.admin.MetricsHistoryHandler;
 import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SharedStoreManager;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
@@ -142,6 +147,32 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           break;
         }
       }
+      
+      // Delete the collection files from shared store. We want to delete all of the files before we delete
+      // the collection state from ZooKeeper.
+      DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
+      if (docCollection != null && docCollection.getSharedIndex()) {
+        SharedStoreManager sharedStoreManager = ocmh.overseer.getCoreContainer().getSharedStoreManager();
+        BlobDeleteManager deleteManager = sharedStoreManager.getBlobDeleteManager();
+        BlobDeleteProcessor deleteProcessor = deleteManager.getOverseerDeleteProcessor();
+        // deletes all files belonging to this collection
+        CompletableFuture<BlobDeleterTaskResult> deleteFuture = 
+            deleteProcessor.deleteCollection(collection, false);
+        
+        BlobDeleterTaskResult result = null;
+        Throwable t = null;
+        try {
+          // TODO: Find a reasonable timeout value
+          result = deleteFuture.get(60, TimeUnit.SECONDS);
+        } catch (Exception ex) {
+          t = ex;
+        }
+        if (t != null || !result.isSuccess()) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not complete deleting collection" + 
+              collection + " from shared store, files belonging to this collection"
+                  + " may be orphaned.", t);
+        }
+      }
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index ce9f29d..692b1f5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -31,13 +31,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -47,6 +50,10 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SharedStoreManager;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +82,8 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
     }
 
     log.info("Delete shard invoked");
-    Slice slice = clusterState.getCollection(collectionName).getSlice(sliceId);
+    DocCollection docCollection = clusterState.getCollection(collectionName);
+    Slice slice = docCollection.getSlice(sliceId);
     if (slice == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
         "No shard with name " + sliceId + " exists for collection " + collectionName);
 
@@ -136,7 +144,31 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       log.debug("Waiting for delete shard action to complete");
       cleanupLatch.await(1, TimeUnit.MINUTES);
       
-      ocmh.overseer.getShardSharedMetadataController().cleanUpMetadataNodes(collectionName, slice.getName());
+      if (docCollection != null && docCollection.getSharedIndex()) {
+        ocmh.overseer.getShardSharedMetadataController().cleanUpMetadataNodes(collectionName, slice.getName());
+        
+        SharedStoreManager sharedStoreManager = ocmh.overseer.getCoreContainer().getSharedStoreManager();
+        BlobDeleteManager deleteManager = sharedStoreManager.getBlobDeleteManager();
+        BlobDeleteProcessor deleteProcessor = deleteManager.getOverseerDeleteProcessor();
+        
+        String sharedShardName = (String) slice.get(ZkStateReader.SHARED_SHARD_NAME);
+        CompletableFuture<BlobDeleterTaskResult> deleteFuture =
+            deleteProcessor.deleteShard(collectionName, sharedShardName, false);
+        
+        BlobDeleterTaskResult result = null;
+        Throwable t = null;
+        try {
+          // TODO: Find a reasonable timeout value
+          result = deleteFuture.get(60, TimeUnit.SECONDS);
+        } catch (Exception ex) {
+          t = ex;
+        }
+        if (t != null || !result.isSuccess()) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not complete deleting shard" + 
+              slice.getName() + " from shared store belonging to collection " + collectionName +
+              ". Files belonging to this shard may be orphaned.", t);
+        }
+      }
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
           collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
index 901b836..9a4872c 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
@@ -136,6 +136,11 @@ public interface CoreStorageClient {
   public void shutdown();
   
   /**
+   * Lists all file names with the given prefix
+   */
+  public List<String> listCoreBlobFiles(String prefix) throws BlobException;
+  
+  /**
    * Lists the blob file names of all of files listed under a given core name's blob store
    * hierarchy that are older than the given timestamp value in milliseconds. Important to 
    * note that that the wall clock of your caller will vary with that of the blob store service
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
index a25e42b..4f8130a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
+import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -33,6 +34,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.lucene.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that handles reads and writes of solr blob files to the local file system.
@@ -237,6 +240,28 @@ public class LocalStorageClient implements CoreStorageClient {
   public void shutdown() {
       
   }
+  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  @Override
+  public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+    try {
+      String rootBlobDir = getBlobAbsolutePath("");
+      Path path = Paths.get(rootBlobDir);
+      List<String> blobFiles =
+        Files.walk(path).map(Path::toFile)
+          .filter(file -> (!file.isDirectory()))
+          .map(file -> {
+            // extracts just the file system blob file name without the root dir
+            return file.getAbsolutePath().substring(rootBlobDir.length());
+          })
+        .filter(name -> name.startsWith(prefix))
+        .distinct()
+        .collect(Collectors.toList());
+      return blobFiles;
+    } catch (Exception ex) {
+      throw new BlobException(ex);
+    }
+  }
     
   @Override
   public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
index c981b96..dc40697 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
@@ -270,6 +270,28 @@ public class S3StorageClient implements CoreStorageClient {
   public void shutdown() {
     s3Client.shutdown();
   }
+  
+  @Override
+  public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+    ListObjectsRequest listRequest = new ListObjectsRequest();
+    listRequest.setBucketName(blobBucketName);
+    listRequest.setPrefix(prefix);
+    
+    List<String> blobFiles = new LinkedList<>();
+    try {
+      ObjectListing objectListing = s3Client.listObjects(listRequest);
+      iterateObjectListingAndConsume(objectListing, object -> {
+        blobFiles.add(object.getKey());
+      });
+      return blobFiles;
+    } catch (AmazonServiceException ase) {
+      throw handleAmazonServiceException(ase);
+    } catch (AmazonClientException ace) {
+      throw new BlobClientException(ace);
+    } catch (Exception ex) {
+      throw new BlobException(ex);
+    }
+  }
 
   @Override
   public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
index c659b9b..ff362e2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.Directory;
@@ -49,11 +48,14 @@ import org.apache.solr.store.blob.client.ToFromJson;
 import org.apache.solr.store.blob.metadata.ServerSideMetadata.CoreFileData;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Class pushing updates from the local core to the Blob Store and pulling updates from Blob store to local core.
  * This class knows about the Solr core directory structure and can translate it into file system abstractions since that's
@@ -510,6 +512,9 @@ public class CorePushPull {
       return BlobStoreUtils.getCurrentTimeMs() - file.getDeletedAt() >= deleteManager.getDeleteDelayMs();
     }
     
+    /**
+     * @return true if the files were enqueued for deletion successfully
+     */
     @VisibleForTesting
     protected boolean enqueueForDelete(String coreName, Set<BlobCoreMetadata.BlobFileToDelete> blobFiles) {
       if (blobFiles == null || blobFiles.isEmpty()) {
@@ -518,7 +523,14 @@ public class CorePushPull {
       Set<String> blobNames = blobFiles.stream()
                                 .map(blobFile -> blobFile.getBlobName())
                                 .collect(Collectors.toCollection(HashSet::new));
-      return deleteManager.enqueueForDelete(coreName, blobNames);
+      
+      BlobDeleteProcessor deleteProcessor = deleteManager.getDeleteProcessor();
+      try {
+        deleteProcessor.deleteFiles(coreName, blobNames, true);
+        return true;
+      } catch (Exception ex) {
+        return false;
+      }
     }
 
     /**
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
index f91680f..81ebd9e 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
@@ -18,40 +18,74 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.lucene.util.NamedThreadFactory;
-import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.metadata.CorePushPull;
+import org.apache.solr.store.blob.metadata.ServerSideMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
- * Manager of blobs (files) to delete, putting them in a queue (if space left on the queue) then consumed and processed
- * by {@link BlobDeleterTask}
+ * This class manages the deletion machinery required by shared storage enabled collections. Its responsibilities
+ * include the allocation and management of bounded deletion task queues and their consumers. 
+ * 
+ * Deletion of blob files from shared store happen on two paths:
+ *  1. In the indexing path, the local {@link SolrCore}'s index files represented by an instance of a
+ *  {@link ServerSideMetadata} object is resolved against the blob store's core.metadata file, or the
+ *  the source of truth for what index files a {@link SolrCore} should have. As the difference between 
+ *  these two metadata instances are resolved, we add files to be deleted to the BlobDeleteManager which
+ *  enqueues a {@link BlobDeleterTask} for asynchronous processing.
+ *  2. In the collection admin API, we may delete a collection or collection shard. In the former, all index
+ *  files belonging to the specified collection on shared storage should be deleted while in the latter 
+ *  all index files belonging to a particular collection/shard pair should be deleted.   
+ * 
+ * Shard leaders are the only replicas receiving indexing traffic and pushing to shared store in a shared collection
+ * so all Solr nodes in a cluster may be sending deletion requests to the shared storage provider at a given moment.
+ * Collection commands are only processed by the Overseer and therefore only the Overseer should be deleting entire
+ * collections or shard files from shared storage.
+ * 
+ * The BlobDeleteManager maintains two queues to prevent any potential starvation, one for the incremental indexing 
+ * deletion path that is always initiated when a Solr node with shared collections starts up and one that is only
+ * used when the current node is Overseer and handles Overseer specific actions.
  */
 public class BlobDeleteManager {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
   /**
+   * Identifier for a BlobDeleteProcessor that runs on all Solr nodes containing shared collections
+   */
+  public static final String BLOB_FILE_DELETER = "BlobFileDeleter";
+  
+  /**
+   * Identifier for a BlobDeleteProcessor that runs on the Overseer if the Solr cluster contains
+   * any shared collection
+   */
+  public static final String OVERSEER_BLOB_FILE_DELETER = "OverseerBlobFileDeleter";
+  
+  /**
    * Limit to the number of blob files to delete accepted on the delete queue (and lost in case of server crash). When
    * the queue reaches that size, no more deletes are accepted (will be retried later for a core, next time it is pushed).
-   * (note that tests in searchserver.blobstore.metadata.CorePushTest trigger a merge that enqueues more than 100 files to
-   * be deleted. If that constant is reduced to 100 for example, some enqueues in the test will fail and there will be
-   * files left to be deleted where we've expected none. So don't reduce it too much :)
    */
-  private static final int ALMOST_MAX_DELETER_QUEUE_SIZE = 200;
+  private static final int DEFAULT_ALMOST_MAX_DELETER_QUEUE_SIZE = 200;
+  
+  private static final int DEFAULT_THREAD_POOL_SIZE = 5;
   
-  private final CoreStorageClient client;
-  private final RejecterThreadPoolExecutor deleteExecutor;
+  // TODO: 30 seconds is currently chosen arbitrarily and these values should be based as configurable values
+  private static final int DEFAULT_DELETE_DELAY_MS = 30000;
+
+  private static int MAX_DELETE_ATTEMPTS = 50;
+  private static long SLEEP_MS_FAILED_ATTEMPT = TimeUnit.SECONDS.toMillis(10);
+  
+  private final CoreStorageClient storageClient;
+  
+  private final BlobDeleteProcessor deleteProcessor;
+  private final BlobDeleteProcessor overseerDeleteProcessor;
 
   /**
    * After a core push has marked a file as deleted, wait at least this long before actually deleting its blob from the
@@ -71,92 +105,81 @@ public class BlobDeleteManager {
    * delete until we know for sure the file can be resuscitated...
    */
   private final long deleteDelayMs;
+  
+  private AtomicBoolean isShutdown; 
 
   /**
-   * TODO : Creates a default delete client, should have config based one  
+   * Creates a new BlobDeleteManager with the provided {@link CoreStorageClient} and instantiates
+   * it with a default deleteDelayMs, queue size, and thread pool size. A default {@link BlobDeleteProcessor}
+   * is also created.
    */
   public BlobDeleteManager(CoreStorageClient client) {
-    // 30 seconds
-    this(client, ALMOST_MAX_DELETER_QUEUE_SIZE, 5, 30000);
+    this(client, DEFAULT_ALMOST_MAX_DELETER_QUEUE_SIZE, DEFAULT_THREAD_POOL_SIZE, DEFAULT_DELETE_DELAY_MS);
   }
   
+  /**
+   * Creates a new BlobDeleteManager with the specified BlobDeleteProcessors.
+   */
+  @VisibleForTesting
+  public BlobDeleteManager(CoreStorageClient client, long deleteDelayMs, BlobDeleteProcessor deleteProcessor,
+      BlobDeleteProcessor overseerDeleteProcessor) {
+    this.deleteDelayMs = deleteDelayMs;
+    this.storageClient = client;
+    this.isShutdown = new AtomicBoolean(false);
+    this.deleteProcessor = deleteProcessor;
+    this.overseerDeleteProcessor = overseerDeleteProcessor;
+  }
+  
+  /**
+   * Creates a new BlobDeleteManager and default {@link BlobDeleteProcessor}.
+   */
   public BlobDeleteManager(CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, long deleteDelayMs) {
-    NamedThreadFactory threadFactory = new NamedThreadFactory("BlobFileDeleter");
-
-    // Note this queue MUST NOT BE BOUNDED, or we risk deadlocks given that BlobDeleterTask's reenqueue themselves upon failure
-    BlockingQueue<Runnable> deleteQueue = new LinkedBlockingDeque<>();
-
-    deleteExecutor = new RejecterThreadPoolExecutor(numDeleterThreads, deleteQueue, almostMaxQueueSize, threadFactory);
-
     this.deleteDelayMs = deleteDelayMs;
-    this.client = client;
+    this.storageClient = client;
+    this.isShutdown = new AtomicBoolean(false);
+    
+    deleteProcessor = new BlobDeleteProcessor(BLOB_FILE_DELETER, storageClient, almostMaxQueueSize, numDeleterThreads,
+        MAX_DELETE_ATTEMPTS, SLEEP_MS_FAILED_ATTEMPT);
+    
+    // Non-Overseer nodes will initiate a delete processor but the underlying pool will sit idle
+    // until the node is elected and tasks are added. The overhead should be small.
+    overseerDeleteProcessor = new BlobDeleteProcessor(OVERSEER_BLOB_FILE_DELETER, storageClient, almostMaxQueueSize, numDeleterThreads,
+        MAX_DELETE_ATTEMPTS, SLEEP_MS_FAILED_ATTEMPT);
   }
-
-  public void shutdown() {
-    deleteExecutor.shutdown();
+  
+  /**
+   * Returns a delete processor for normal indexing flow shared store deletion
+   */
+  public BlobDeleteProcessor getDeleteProcessor() {
+    return deleteProcessor;
   }
-
-  public long getDeleteDelayMs() {
-    return deleteDelayMs;
+  
+  /**
+   * Returns a delete processor specifically allocated for Overseer use
+   */
+  public BlobDeleteProcessor getOverseerDeleteProcessor() {
+    return overseerDeleteProcessor;
   }
 
   /**
-   * This method is called 'externally" (i.e. not by tasks needing to reenqueue) and enq
-   * @return <code>true</code> if the delete was enqueued, <code>false</code> if can't be enqueued (deleter turned off
-   * by config or current queue of blobs file deletes too full).
+   * Shuts down all {@link BlobDeleteProcessor} currently managed by the BlobDeleteManager
    */
-  public boolean enqueueForDelete(String sharedBlobName, Set<String> blobNames) {
-    BlobDeleterTask command = new BlobDeleterTask(client, sharedBlobName, blobNames, deleteExecutor);
-    return deleteExecutor.executeIfPossible(command);
+  public void shutdown() {
+    isShutdown.set(true);
+    log.info("BlobDeleteManager is shutting down");
+    
+    deleteProcessor.close();
+    log.info("BlobDeleteProcessor " + deleteProcessor.getName() + " has shutdown");
+    overseerDeleteProcessor.close();
+    log.info("BlobDeleteProcessor " + overseerDeleteProcessor.getName() + " has shutdown");
   }
 
   /**
-   * Subclass of {@link ThreadPoolExecutor} that has an additional command enqueue method {@link #executeIfPossible(Runnable)}
-   * that rejects the enqueue if the underlying queue is over a configured size.<p>
-   * The created thread pool executor has a fixed number of threads because the undelying blocking queue is unbounded.
+   * Get the delay that a file being deleted via the indexing flow must wait before it is 
+   * eligible for deletion from shared storage. See {@link BlobDeleteManager#deleteDelayMs} for more
+   * details  
    */
-  private class RejecterThreadPoolExecutor extends MDCAwareThreadPoolExecutor {
-    private final int targetMaxQueueSize;
-    /**
-     * @param poolSize the number of threads to keep in the pool. There is a fixed number of threads in that pool,
-     *                 because a {@link ThreadPoolExecutor} using an unbounded queue will not have the pool create more threads
-     *                 than the core pool size, even tasks are slow to execute.
-     * @param workQueue the queue to use for holding tasks before they are
-     *        executed.  This queue will hold only the {@code Runnable}
-     *        tasks submitted by the {@code execute} method.
-     * @param targetMaxQueueSize max queue size to accept enqueues through {@link #executeIfPossible(Runnable)} but having
-     *        no impact on enqueues through {@link #execute(Runnable)}.
-     * @param threadFactory the factory to use when the executor
-     *        creates a new thread
-     */
-    RejecterThreadPoolExecutor(int poolSize,
-                              BlockingQueue<Runnable> workQueue,
-                              int targetMaxQueueSize,
-                              ThreadFactory threadFactory) {
-      super(poolSize, poolSize, 0L, TimeUnit.SECONDS, workQueue, threadFactory);
-      this.targetMaxQueueSize = targetMaxQueueSize;
-    }
-
-    /**
-     * Enqueues the passed <code>command</code> for execution just like a call to superclass' {@link ThreadPoolExecutor#execute(Runnable)}
-     * if the work queue is not too full (below or at size <code>targetMaxQueueSize</code> passed in the constructor).
-     * @param command the task to execute
-     * @return <code>true</code> if the <code>command</code> was accepted and enqueued for execution (or executed),
-     * <code>false</code> if the underlying queue was too full and the <code>command</code> was not enqueued for execution
-     * (i.e. nothing happened).
-     */
-    boolean executeIfPossible(Runnable command) {
-      if (getQueue().size() > targetMaxQueueSize) {
-        return false;
-      }
-
-      try {
-        execute(command);
-      } catch (RejectedExecutionException ree) {
-        // pool might be shutting down
-        return false;
-      }
-      return true;
-    }
+  public long getDeleteDelayMs() {
+    return deleteDelayMs;
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java
new file mode 100644
index 0000000..f15fbb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
+import org.apache.solr.store.blob.client.BlobClientUtils;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobFileDeletionTask;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobPrefixedFileDeletionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A generic deletion processor used for deleting object files from shared
+ * storage. Each processor manages its own task bounded thread pool for processing
+ * {@link BlobDeleterTask} asynchronously. Processors support retrying tasks if 
+ * necessary but retry decisions are left to the individual task implementations.  
+ * 
+ * Instances of {@link BlobDeleteProcessor} are managed by the {@link BlobDeleteManager}.
+ */
+public class BlobDeleteProcessor implements Closeable {
+  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private final String name;
+  private final int almostMaxQueueSize;
+  /**
+   * Note we sleep() after each failed attempt, so multiply this value by {@link #fixedRetryDelay} to find
+   * out how long we'll retry (at least) if Blob access fails for some reason ("at least" because we
+   * re-enqueue at the tail of the queue ({@link BlobDeleteManager} creates a list), so there might be additional
+   * processing delay if the queue is not empty and is processed before the enqueued retry is processed).
+   */
+  private final int maxDeleteAttempts;
+  private final long fixedRetryDelay;
+  private final CoreStorageClient client;
+  private final BlockingQueue<Runnable> deleteQueue;
+  private final MDCAwareThreadPoolExecutor deleteExecutor;
+  
+  /**
+   * @param name identifying the processor
+   * @param client the storage client to use in this processor
+   * @param almostMaxQueueSize the target max queue size
+   * @param numDeleterThreads the number of threads to configure in the underlying thread pool
+   * @param defaultMaxDeleteAttempts maximum number of attempts to retry any task enqueued in this processor
+   * @param fixedRetryDelay fixed time delay in ms between retry attempts
+   */
+  public BlobDeleteProcessor(String name, CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads,
+      int defaultMaxDeleteAttempts, long fixedRetryDelay) {
+    this.name = name;
+    this.almostMaxQueueSize = almostMaxQueueSize;
+    this.maxDeleteAttempts = defaultMaxDeleteAttempts;
+    this.fixedRetryDelay = fixedRetryDelay;
+    NamedThreadFactory threadFactory = new NamedThreadFactory(name);
+
+    // Note this queue MUST NOT BE BOUNDED, or we risk deadlocks given that BlobDeleterTask's 
+    // re-enqueue themselves upon failure
+    deleteQueue = new LinkedBlockingDeque<>();
+    
+    deleteExecutor = new MDCAwareThreadPoolExecutor(numDeleterThreads, numDeleterThreads, 0L, TimeUnit.SECONDS, deleteQueue, threadFactory);
+    this.client = client;
+  }
+  
+  /**
+   * Enqueues the given set of files for deletion from shared store as an async task. 
+   * 
+   * @param collectionName the name of the collection the files belong to
+   * @param blobNames list of file paths to delete from shared store
+   * @param allowRetry flag indicating if the task should be retried if it fails
+   */
+  public CompletableFuture<BlobDeleterTaskResult> deleteFiles(String collectionName, Set<String> blobNames, boolean allowRetry) {
+    BlobDeleterTask task = new BlobFileDeletionTask(client, collectionName, blobNames, 
+        allowRetry, maxDeleteAttempts);
+    return enqueue(task, false);
+  }
+  
+  /**
+   * Enqueues a task to delete all files belonging to the given collection from shared store as an async task.
+   * 
+   * @param collectionName the name of the collection to be deleted from shared store
+   * @param allowRetry flag indicating if the task should be retried if it fails
+   */
+  public CompletableFuture<BlobDeleterTaskResult> deleteCollection(String collectionName, boolean allowRetry) {
+    BlobDeleterTask task = new BlobPrefixedFileDeletionTask(client, collectionName, collectionName, 
+        allowRetry, maxDeleteAttempts);
+    return enqueue(task, false);
+  }
+  
+  /**
+   * Enqueues a task to delete all files belonging to the given collection and {@link ZkStateReader#SHARED_SHARD_NAME} 
+   * from shared store as an async task.
+   * 
+   * @param collectionName the name of the collection the files belong to
+   * @param sharedShardName the identifier for the shardId located on the shared store  
+   * @param allowRetry flag indicating if the task should be retried if it fails
+   */
+  public CompletableFuture<BlobDeleterTaskResult> deleteShard(String collectionName, String sharedShardName, boolean allowRetry) {
+    BlobDeleterTask task = new BlobPrefixedFileDeletionTask(client, collectionName, 
+        sharedShardName + BlobClientUtils.BLOB_FILE_PATH_DELIMITER, allowRetry, maxDeleteAttempts);
+    return enqueue(task, false);
+  }
+  
+  /**
+   * Enqueues a task to be processed by a thread in the {@link BlobDeleteProcessor#deleteExecutor} thread
+   * pool. The callback is handled by the same execution thread and will re-enqueue a task that has failed
+   * and should be retried. Tasks that are enqueued via the retry mechanism are not bound by the same size
+   * constraints as newly minted tasks are.
+   * 
+   * @returns CompletableFuture to allow calling threads the capability to block on the 
+   * computation results as needed, retrieved suppressed exceptions in retry, etc
+   */
+  @VisibleForTesting
+  protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+    if (!isRetry && deleteQueue.size() > almostMaxQueueSize) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
+          "Unable to enqueue deletion task: " + task.toString());
+    }
+
+    return CompletableFuture.supplyAsync(() -> {
+      return task.call();
+    }, deleteExecutor)
+    .thenCompose(result -> {
+      // the callback will execute on the same thread as the executing task
+      if (!result.isSuccess() && result.shouldRetry()) {
+        try {
+          // Some delay before retry... (could move this delay to before trying to delete a file that previously
+          // failed to be deleted, that way if the queue is busy and it took time to retry, we don't add an additional
+          // delay on top of that. On the other hand, an exception here could be an issue with the Blob store
+          // itself and nothing specific to the file at hand, so slowing all delete attempts for all files might
+          // make sense.
+          Thread.sleep(fixedRetryDelay);
+          return enqueue(result.getTask(), result.shouldRetry());
+        } catch (Exception ex) {
+          log.error("Could not re-enqueue failed deleter task that should have been enqueued!", ex);
+        }
+      }
+      return CompletableFuture.completedFuture(result);
+    });
+  }
+  
+  @Override
+  public void close() {
+    deleteExecutor.shutdown();
+  }  
+  
+  public String getName() {
+    return name;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
index c2eaa53..e3fdecd 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
@@ -18,95 +18,245 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Locale;
 import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Task in charge of deleting Blobs (files) from blob store.
+ * Generic deletion task for files located on shared storage
  */
-class BlobDeleterTask implements Runnable {
+public abstract class BlobDeleterTask implements Callable<BlobDeleterTaskResult> {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  /**
-   * Note we sleep() after each failed attempt, so multiply this value by {@link #SLEEP_MS_FAILED_ATTEMPT} to find
-   * out how long we'll retry (at least) if Blob access fails for some reason ("at least" because we
-   * re-enqueue at the tail of the queue ({@link BlobDeleteManager} creates a list), so there might be additional
-   * processing delay if the queue is not empty and is processed before the enqueued retry is processed).
-   */
-  private static int MAX_DELETE_ATTEMPTS = 50;
-  private static long SLEEP_MS_FAILED_ATTEMPT = TimeUnit.SECONDS.toMillis(10);
-
+  
   private final CoreStorageClient client;
-  private final String sharedBlobName;
-  private final Set<String> blobNames;
+  private final String collectionName;
   private final AtomicInteger attempt;
-  private final ThreadPoolExecutor executor;
+  
   private final long queuedTimeMs;
+  private final int maxAttempts;
+  private final boolean allowRetry;
+  private Throwable err;
 
-  BlobDeleterTask(CoreStorageClient client, String sharedBlobName, Set<String> blobNames, ThreadPoolExecutor executor) {
-    this.client = client; 
-    this.sharedBlobName = sharedBlobName;
-    this.blobNames = blobNames;
+  public BlobDeleterTask(CoreStorageClient client, String collectionName, boolean allowRetry,
+      int maxAttempts) {
+    this.client = client;
+    this.collectionName = collectionName;
     this.attempt = new AtomicInteger(0);
-    this.executor = executor;
     this.queuedTimeMs = BlobStoreUtils.getCurrentTimeMs();
+    this.allowRetry = allowRetry;
+    this.maxAttempts = maxAttempts;
   }
-
+  
+  /**
+   * Performs a deletion action and request against the shared storage for the given collection
+   * and returns the list of file paths deleted
+   */
+  public abstract Collection<String> doDelete() throws Exception;
+  
+  /**
+   * Return a String representing the action performed by the BlobDeleterTask for logging purposes
+   */
+  public abstract String getActionName();
+  
   @Override
-  public void run() {
+  public BlobDeleterTaskResult call() {
+    List<String> filesDeleted = new LinkedList<>();
     final long startTimeMs = BlobStoreUtils.getCurrentTimeMs();
     boolean isSuccess = true;
-      
+    boolean shouldRetry = false;
     try {
+      filesDeleted.addAll(doDelete());
+      attempt.incrementAndGet();
+      return new BlobDeleterTaskResult(this, filesDeleted, isSuccess, shouldRetry, err);
+    } catch (Exception ex) {
+      if (err == null) {
+        err = ex;
+      } else {
+        err.addSuppressed(ex);
+      }
+      int attempts = attempt.incrementAndGet();
+      isSuccess = false;
+      log.warn("BlobDeleterTask failed on attempt=" + attempts  + " collection=" + collectionName
+          + " task=" + toString(), ex);
+      if (allowRetry) {
+        if (attempts < maxAttempts) {
+          shouldRetry = true;
+        } else {
+          log.warn("Reached " + maxAttempts + " attempt limit for deletion task " + toString() + 
+              ". This task won't be retried.");
+        }
+      }
+    } finally {
+      long now = System.nanoTime() / 1000000;
+      long runTime = now - startTimeMs;
+      long startLatency = now - this.queuedTimeMs;
+      log(getActionName(), collectionName, runTime, startLatency, isSuccess, getAdditionalLogMessage());
+    }
+    return new BlobDeleterTaskResult(this, filesDeleted, isSuccess, shouldRetry, err);
+  }
+  
+  /**
+   * Override-able by deletion tasks to provide additional action specific logging
+   */
+  public String getAdditionalLogMessage() {
+    return "";
+  }
+  
+  @Override
+  public String toString() {
+    return "collectionName=" + collectionName + " allowRetry=" + allowRetry + 
+        " queuedTimeMs=" + queuedTimeMs + " attemptsTried=" + attempt.get();
+  }
+  
+  public int getAttempts() {
+    return attempt.get();
+  }
+
+  public void log(String action, String collectionName, long runTime, long startLatency, boolean isSuccess, 
+      String additionalMessage) {
+    String message = String.format(Locale.ROOT, 
+        "action=%s storageProvider=%s bucketRegion=%s bucketName=%s, runTime=%s "
+        + "startLatency=%s attempt=%s isSuccess=%s %s",
+        action, client.getStorageProvider().name(), client.getBucketRegion(), client.getBucketName(),
+        runTime, startLatency, attempt.get(), isSuccess, additionalMessage);
+    log.info(message);
+  }
+  
+  /**
+   * Represents the result of a deletion task
+   */
+  public static class BlobDeleterTaskResult {
+    private final BlobDeleterTask task;
+    private final Collection<String> filesDeleted;
+    private final boolean isSuccess;
+    private final boolean shouldRetry;
+    private final Throwable err;
+    
+    public BlobDeleterTaskResult(BlobDeleterTask task, Collection<String> filesDeleted, 
+        boolean isSuccess, boolean shouldRetry, Throwable errs) {
+      this.task = task;
+      this.filesDeleted = filesDeleted;
+      this.isSuccess = isSuccess;
+      this.shouldRetry = shouldRetry;
+      this.err = errs;
+    }
+    
+    public boolean isSuccess() {
+      return isSuccess;
+    }
+    
+    public boolean shouldRetry() {
+      return shouldRetry;
+    }
+    
+    public BlobDeleterTask getTask() {
+      return task;
+    }
+    
+    /**
+     * @return the files that are being deleted. Note if the task wasn't successful there is no guarantee
+     * all of these files were in fact deleted from shared storage
+     */
+    public Collection<String> getFilesDeleted() {
+      return filesDeleted;
+    }
+    
+    public Throwable getError() {
+      return err;
+    }
+  }
+  
+  /**
+   * A BlobDeleterTask that deletes a given set of blob files from shared store
+   */
+  public static class BlobFileDeletionTask extends BlobDeleterTask {
+    private final CoreStorageClient client;
+    private final Set<String> blobNames;
+    
+    /**
+     * Constructor for BlobDeleterTask that deletes a given set of blob files from shared store
+     */
+    public BlobFileDeletionTask(CoreStorageClient client, String collectionName, Set<String> blobNames, boolean allowRetry,
+        int maxRetryAttempt) {
+      super(client, collectionName, allowRetry, maxRetryAttempt);
+      this.blobNames = blobNames;
+      this.client = client;
+    }
+
+    @Override
+    public Collection<String> doDelete() throws Exception {
       client.deleteBlobs(blobNames);
-      // Blob might not have been deleted if at some point we've enqueued files to delete while doing a core push,
-      // but the push ended up failing and the core.metadata file was not updated. We ended up with the blobs enqueued for
-      // delete and eventually removed by a BlobDeleterTask and the files to delete still present in core.metadata
-      // so enqueued again.
-      // Note it is ok to delete these files even if the core.metadata update fails. The delete is not linked
-      // to the push activity, it is related to blobs marked for delete that can be safely removed after some delay has passed.
-      } catch (Exception e) {
-        isSuccess = false;
-        int attempts = attempt.incrementAndGet();
+      return blobNames;
+    }
+
+    @Override
+    public String getActionName() {
+      return "DELETE";
+    }
+    
+    @Override 
+    public String getAdditionalLogMessage() {
+      return "filesAffected=" + blobNames.size();
+    }
+    
+    @Override
+    public String toString() {
+      return "BlobFileDeletionTask action=" + getActionName() + " totalFilesSpecified=" + blobNames.size() + 
+          " " + super.toString();
+    }
+  }
+  
+  /**
+   * A BlobDeleterTask that deletes all files from shared storage with the given string prefix 
+   */
+  public static class BlobPrefixedFileDeletionTask extends BlobDeleterTask {
+    private final CoreStorageClient client;
+    private final String prefix;
+    
+    private int affectedFiles = 0;
+    
+    /**
+     * Constructor for BlobDeleterTask that deletes all files from shared storage with the given string prefix 
+     */
+    public BlobPrefixedFileDeletionTask(CoreStorageClient client, String collectionName, String prefix, boolean allowRetry,
+        int maxRetryAttempt) {
+      super(client, collectionName, allowRetry, maxRetryAttempt);
+      this.prefix = prefix;
+      this.client = client;
+    }
 
-        log.warn("Blob file delete task failed."
-                +" attempt=" + attempts +  " sharedBlobName=" + this.sharedBlobName + " numOfBlobs=" + this.blobNames.size(), e);
+    @Override
+    public List<String> doDelete() throws Exception {
+      List<String> allFiles = client.listCoreBlobFiles(prefix);
+      affectedFiles = allFiles.size();
+      client.deleteBlobs(allFiles);
+      return allFiles;
+    }
 
-        if (attempts < MAX_DELETE_ATTEMPTS) {
-          // We failed, but we'll try again. Enqueue the task for a new delete attempt. attempt already increased.
-          // Note this execute call accepts the
-          try {
-            // Some delay before retry... (could move this delay to before trying to delete a file that previously
-            // failed to be deleted, that way if the queue is busy and it took time to retry, we don't add an additional
-            // delay on top of that. On the other hand, an exception here could be an issue with the Blob store
-            // itself and nothing specific to the file at hand, so slowing all delete attempts for all files might
-            // make sense. Splunk will eventually tell us... or not.
-            Thread.sleep(SLEEP_MS_FAILED_ATTEMPT);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-          // This can throw an exception if the pool is shutting down.
-          executor.execute(this);
-        }
-      } finally {
-        long now = BlobStoreUtils.getCurrentTimeMs();
-        long runTimeMs = now - startTimeMs;
-        long startLatency = now - this.queuedTimeMs;
-        String message = String.format(Locale.ROOT,
-               "sharedBlobName=%s action=DELETE storageProvider=%s bucketRegion=%s bucketName=%s "
-                      + "runTime=%s startLatency=%s attempt=%s filesAffected=%s isSuccess=%s",
-                      sharedBlobName, client.getStorageProvider().name(), client.getBucketRegion(),
-                      client.getBucketName(), runTimeMs, startLatency, attempt.get(), this.blobNames.size(), isSuccess);
-        log.info(message);
-      }
+    @Override
+    public String getActionName() {
+      return "DELETE_FILES_PREFIXED";
+    }
+    
+    @Override 
+    public String getAdditionalLogMessage() {
+      return "filesAffected=" + affectedFiles;
+    }
+    
+    @Override
+    public String toString() {
+      return "BlobCollectionDeletionTask action=" + getActionName() + " " + super.toString();
+    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
index 16a66b7..9fec3f4 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
@@ -84,9 +84,6 @@ public class SharedStoreManager {
     return blobStorageProvider;
   }
   
-  /*
-   * Initiates a BlobDeleteManager if it doesn't exist and returns one 
-   */
   public BlobDeleteManager getBlobDeleteManager() {
     if (blobDeleteManager != null) {
       return blobDeleteManager;
@@ -123,5 +120,13 @@ public class SharedStoreManager {
   public void initConcurrencyController(SharedCoreConcurrencyController concurrencyController) {
     this.sharedCoreConcurrencyController = concurrencyController;
   }
+  
+  @VisibleForTesting
+  public void initBlobDeleteManager(BlobDeleteManager blobDeleteManager) {
+    if (this.blobDeleteManager != null) {
+      blobDeleteManager.shutdown();
+    }
+    this.blobDeleteManager = blobDeleteManager;
+  }
 
 }
\ No newline at end of file
diff --git a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
index d03da40..b35b178 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
@@ -24,6 +24,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
@@ -88,6 +89,24 @@ public class CoreStorageClientTest extends SolrTestCaseJ4 {
     int expectedBlobKeyLength = TEST_CORE_NAME_1.length() + uuid4length + 1 + 4;
     Assert.assertEquals(blobPath.length(), expectedBlobKeyLength);
   }
+  
+  @Test
+  public void testListBlobFiles() throws Exception {
+    List<String> expectedPaths = new LinkedList<>();
+    expectedPaths.add(
+        blobClient.pushStream(TEST_CORE_NAME_1, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "xxx"));
+    expectedPaths.add(
+        blobClient.pushStream(TEST_CORE_NAME_1, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "yyy"));
+    expectedPaths.add(
+        blobClient.pushStream(TEST_CORE_NAME_2, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "zzzz"));
+    expectedPaths.add(
+        blobClient.pushStream(TEST_CORE_NAME_2, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "1234"));
+    blobClient.pushStream("s_test_core_nomatch", new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "1234");
+    // the common prefix is s_test_core_name for these blob files
+    List<String> blobFiles = blobClient.listCoreBlobFiles("s_test_core_name");
+    Assert.assertEquals(expectedPaths.size(), blobFiles.size());
+    Assert.assertTrue(blobFiles.toString(), blobFiles.containsAll(expectedPaths));
+  }
     
   @Test
   public void testListBlobCommonPrefixes() throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java
new file mode 100644
index 0000000..aa7774d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.store.blob.client.BlobException;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.client.LocalStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobFileDeletionTask;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobPrefixedFileDeletionTask;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BlobDeleteProcessor}
+ */
+public class BlobDeleteProcessorTest extends SolrTestCaseJ4 {
+  
+  private static String DEFAULT_PROCESSOR_NAME = "DeleterForTest";
+  private static Path sharedStoreRootPath;
+  private static CoreStorageClient blobClient;
+  
+  private static List<BlobDeleterTask> enqueuedTasks;
+
+  @BeforeClass
+  public static void setupTestClass() throws Exception {
+    sharedStoreRootPath = createTempDir("tempDir");
+    System.setProperty(LocalStorageClient.BLOB_STORE_LOCAL_FS_ROOT_DIR_PROPERTY, sharedStoreRootPath.resolve("LocalBlobStore/").toString());
+    blobClient = new LocalStorageClient() {
+       
+      // no ops for BlobFileDeletionTask and BlobPrefixedFileDeletionTask to execute successfully
+      @Override
+      public void deleteBlobs(Collection<String> paths) throws BlobException {
+        return;
+      }
+
+      // no ops for BlobFileDeletionTask and BlobPrefixedFileDeletionTask to execute successfully
+      @Override
+      public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+        return new LinkedList<>();
+      }
+    };
+  }
+  
+  @Before
+  public void setup() {
+    enqueuedTasks = new LinkedList<BlobDeleterTask>();
+  }
+  
+  /**
+   * Verify we enqueue a {@link BlobFileDeletionTask} with the correct parameters.
+   * Note we're not testing the functionality of the deletion task here only that the processor successfully
+   * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest} 
+   */
+  @Test
+  public void testDeleteFilesEnqueueTask() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 5;
+    int retryDelay = 500; 
+    String name = "testName";
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      Set<String> names = new HashSet<>();
+      names.add("test1");
+      names.add("test2");
+      // uses the specified defaultMaxAttempts at the processor (not task) level 
+      CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteFiles(name, names, true);
+      // wait for this task and all its potential retries to finish
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      assertEquals(1, enqueuedTasks.size());
+      
+      assertEquals(1, enqueuedTasks.size());
+      assertNotNull(res);
+      assertEquals(1, res.getTask().getAttempts());
+      assertEquals(true, res.isSuccess());
+      assertEquals(false, res.shouldRetry());
+    }
+  }
+  
+  /**
+   * Verify we enqueue a {@link BlobPrefixedFileDeletionTask} with the correct parameters.
+   * Note we're not testing the functionality of the deletion task here only that the processor successfully
+   * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest} 
+   */
+  @Test
+  public void testDeleteShardEnqueueTask() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 5;
+    int retryDelay = 500; 
+    String name = "testName";
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // uses the specified defaultMaxAttempts at the processor (not task) level 
+      CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteCollection(name, true);
+      // wait for this task and all its potential retries to finish
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      assertEquals(1, enqueuedTasks.size());
+      
+      assertEquals(1, enqueuedTasks.size());
+      assertNotNull(res);
+      assertEquals(1, res.getTask().getAttempts());
+      assertEquals(true, res.isSuccess());
+      assertEquals(false, res.shouldRetry());
+    }
+  }
+  
+  /**
+   * Verify we enqueue a {@link BlobPrefixedFileDeletionTask} with the correct parameters.
+   * Note we're not testing the functionality of the deletion task here only that the processor successfully
+   * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest} 
+   */
+  @Test
+  public void testDeleteCollectionEnqueueTask() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 5;
+    int retryDelay = 500; 
+    String name = "testName";
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // uses the specified defaultMaxAttempts at the processor (not task) level 
+      CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteShard(name, name, true);
+      // wait for this task and all its potential retries to finish
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      assertEquals(1, enqueuedTasks.size());
+      
+      assertEquals(1, enqueuedTasks.size());
+      assertNotNull(res);
+      assertEquals(1, res.getTask().getAttempts());
+      assertEquals(true, res.isSuccess());
+      assertEquals(false, res.shouldRetry());
+    }
+  }
+  
+  /**
+   * Verify that we don't retry tasks that are not configured to be retried
+   * and end up failing
+   */
+  @Test
+  public void testNonRetryableTask() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 1; // ignored when we build the test task
+    int retryDelay = 500;
+    int totalAttempts = 5; // total number of attempts the task should be tried 
+
+    String name = "testName";
+    boolean isRetry = false;
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // enqueue a task that fails and is not retryable
+      CompletableFuture<BlobDeleterTaskResult> cf = 
+          processor.enqueue(buildFailingTaskForTest(blobClient, name, totalAttempts, false), isRetry);
+      // wait for this task and all its potential retries to finish
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      
+      // the first fails
+      assertEquals(1, enqueuedTasks.size());
+      assertNotNull(res);
+      assertEquals(1, res.getTask().getAttempts());
+      assertEquals(false, res.isSuccess());
+      assertEquals(false, res.shouldRetry());
+  
+      // initial error + 0 retry errors suppressed
+      assertNotNull(res.getError());
+      assertEquals(0, res.getError().getSuppressed().length);
+    }
+  }
+  
+  /**
+   * Verify that the retry logic kicks in for tasks configured to retry
+   * and subsequent retry succeeds
+   */
+  @Test
+  public void testRetryableTaskSucceeds() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 1; // ignored when we build the test task
+    int retryDelay = 500;
+    int totalAttempts = 5; // total number of attempts the task should be tried 
+    int totalFails = 3; // total number of times the task should fail
+    
+    String name = "testName";
+    boolean isRetry = false;
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // enqueue a task that fails totalFails number of times before succeeding
+      CompletableFuture<BlobDeleterTaskResult> cf = 
+          processor.enqueue(buildScheduledFailingTaskForTest(blobClient, name, totalAttempts, true, totalFails), isRetry);
+      
+      // wait for this task and all its potential retries to finish
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      
+      // the first 3 fail and last one succeeds
+      assertEquals(4, enqueuedTasks.size());
+      
+      assertNotNull(res);
+      assertEquals(4, res.getTask().getAttempts());
+      assertEquals(true, res.isSuccess());
+      
+      // initial error + 2 retry errors suppressed
+      assertNotNull(res.getError());
+      assertEquals(2, res.getError().getSuppressed().length);
+    }
+  }
+  
+  /**
+   * Verify that after all task attempts are exhausted we bail out
+   */
+  @Test
+  public void testRetryableTaskFails() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 1; // ignored when we build the test task
+    int retryDelay = 500;
+    int totalAttempts = 5; // total number of attempts the task should be tried
+    
+    String name = "testName";
+    boolean isRetry = false;
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // enqueue a task that fails every time it runs but is configured to retry
+      CompletableFuture<BlobDeleterTaskResult> cf = 
+          processor.enqueue(buildFailingTaskForTest(blobClient, name, totalAttempts, true), isRetry);
+      
+      // wait for this task and all its potential retries to finish 
+      BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+      // 1 initial enqueue + 4 retries
+      assertEquals(5, enqueuedTasks.size());
+      
+      assertNotNull(res);
+      assertEquals(5, res.getTask().getAttempts());
+      assertEquals(false, res.isSuccess());
+      // circuit breaker should be false after all attempts are exceeded
+      assertEquals(false, res.shouldRetry());
+      
+      // initial error + 4 retry errors suppressed
+      assertNotNull(res.getError());
+      assertEquals(4, res.getError().getSuppressed().length);
+    }
+  }
+  
+  /**
+   * Verify that we cannot add more deletion tasks to the processor if the work queue
+   * is at its target max but that we can re-add tasks that are retries to the queue
+   */
+  @Test
+  public void testWorkQueueFull() throws Exception {
+    int maxQueueSize = 3;
+    int numThreads = 1;
+    int defaultMaxAttempts = 1;
+    int retryDelay = 1000;
+    
+    String name = "testName";
+    boolean allowRetry = false;
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      // numThreads is 1 and we'll enqueue a blocking task that ensures our pool
+      // will be occupied while we add new tasks subsequently to test enqueue rejection
+      CountDownLatch tasklatch = new CountDownLatch(1);
+      processor.enqueue(buildBlockingTaskForTest(tasklatch), allowRetry);
+  
+      // Fill the internal work queue beyond the maxQueueSize, the internal queue size is not 
+      // approximate so we'll just add beyond the max
+      for (int i = 0; i < maxQueueSize*2; i++) {
+        try {
+          processor.deleteCollection(name, allowRetry);
+        } catch (Exception ex) {
+          // ignore
+        }
+      }
+      
+      // verify adding a new task is rejected
+      try {
+        processor.deleteCollection(name, allowRetry);
+        fail("Task should have been rejected");
+      } catch (Exception ex) {
+        assertTrue(ex.getMessage().contains("Unable to enqueue deletion"));
+      }
+      CompletableFuture<BlobDeleterTaskResult> cf = null;
+      try {
+        // verify adding a task that is marked as a retry is not rejected 
+         cf = processor.enqueue(buildFailingTaskForTest(blobClient, name, 5, true), /* isRetry */ true);
+      } catch (Exception ex) {
+        fail("Task should not have been rejected");
+      }
+      
+      // clean up and unblock the task
+      tasklatch.countDown();
+    }
+  }
+  
+  /**
+   * Verify that with a continuous stream of delete tasks being enqueued, all eventually complete
+   * successfully in the face of failing tasks and retries without locking up our pool anywhere
+   */
+  @Test
+  public void testSimpleConcurrentDeletionEnqueues() throws Exception {
+    int maxQueueSize = 200;
+    int numThreads = 5;
+    int defaultMaxAttempts = 5;
+    int retryDelay = 100;
+    int numberOfTasks = 200;
+    
+    try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+      List<BlobDeleterTask> tasks = generateRandomTasks(defaultMaxAttempts, numberOfTasks);
+      List<CompletableFuture<BlobDeleterTaskResult>> taskResultsFutures = new LinkedList<>();
+      List<BlobDeleterTaskResult> results = new LinkedList<>();
+      for (BlobDeleterTask t : tasks) {
+        taskResultsFutures.add(processor.enqueue(t, false));
+      }
+      
+      taskResultsFutures.forEach(cf -> {
+        try {
+          results.add(cf.get(20000, TimeUnit.MILLISECONDS));
+        } catch (Exception ex) {
+          fail("We timed out on some task!");
+        }
+      });
+      
+      // we shouldn't enqueue more than (numberOfTasks * defaultMaxAttempts) tasks to the pool 
+      assertTrue(enqueuedTasks.size() < (numberOfTasks * defaultMaxAttempts));
+      assertEquals(numberOfTasks, results.size());
+      int totalAttempts = 0;
+      for (BlobDeleterTaskResult res : results) {
+        assertNotNull(res);
+        assertNotNull(res.getTask());
+        assertEquals("scheduledFailingTask", res.getTask().getActionName());
+        totalAttempts += res.getTask().getAttempts();
+      }
+      // total task attempts should be consistent with our test scaffolding
+      assertTrue(totalAttempts < (numberOfTasks * defaultMaxAttempts));
+    }
+  }
+  
+  private List<BlobDeleterTask> generateRandomTasks(int defaultMaxAttempts, int taskCount) {
+    List<BlobDeleterTask> tasks = new LinkedList<>();
+    for (int i = 0; i < taskCount; i++) {
+      BlobDeleterTask task = null;
+      int totalAttempts = random().nextInt(defaultMaxAttempts);
+      int totalFails = random().nextInt(defaultMaxAttempts + 1);
+      task = buildScheduledFailingTaskForTest(blobClient, "test"+i, totalAttempts, true, totalFails);
+      tasks.add(task);
+    }
+    return tasks;
+  }
+  
+  /**
+   * Returns a test-only task for just holding onto a resource for test purposes
+   */
+  private BlobDeleterTask buildBlockingTaskForTest(CountDownLatch latch) {
+    return new BlobDeleterTask(null, null, false, 0) {
+      @Override
+      public Collection<String> doDelete() throws Exception {
+        // block until something forces this latch to count down
+        latch.await();
+        return new LinkedList<>();
+      }
+      
+      @Override
+      public String getActionName() { return "blockingTask"; }
+    };
+  }
+  
+  /**
+   * Returns a test-only task that always fails on action execution by throwing an
+   * exception
+   */
+  private BlobDeleterTask buildFailingTaskForTest(CoreStorageClient client, 
+      String collectionName, int maxRetries, boolean allowRetries) {
+    return new BlobDeleterTask(client, collectionName, allowRetries, maxRetries) {
+      @Override
+      public Collection<String> doDelete() throws Exception {
+        throw new Exception("");
+      }
+      
+      @Override
+      public String getActionName() { return "failingTask"; }
+    };
+  }
+  
+  /**
+   * Returns a test-only task that fails a specified number of times before succeeding
+   */
+  private BlobDeleterTask buildScheduledFailingTaskForTest(CoreStorageClient client, 
+      String collectionName, int maxRetries, boolean allowRetries, int failTotal) {
+    return new BlobDeleterTask(client, collectionName, allowRetries, maxRetries) {
+      private AtomicInteger failCount = new AtomicInteger(0);
+      
+      @Override
+      public Collection<String> doDelete() throws Exception {
+        while (failCount.get() < failTotal) {
+          failCount.incrementAndGet();
+          throw new Exception("");
+        }
+        return new LinkedList<>();
+      }
+      
+      @Override
+      public String getActionName() { return "scheduledFailingTask"; }
+    };
+  }
+  
+  // enables capturing all enqueues to the executor pool, including retries
+  private BlobDeleteProcessor buildBlobDeleteProcessorForTest(List<BlobDeleterTask> enqueuedTasks, 
+      CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, int defaultMaxDeleteAttempts, 
+      long fixedRetryDelay) {
+    return new BlobDeleteProcessorForTest(DEFAULT_PROCESSOR_NAME, client, almostMaxQueueSize, numDeleterThreads,
+        defaultMaxDeleteAttempts, fixedRetryDelay, enqueuedTasks);
+  }
+  
+  class BlobDeleteProcessorForTest extends BlobDeleteProcessor {
+    List<BlobDeleterTask> enqueuedTasks;
+    
+    public BlobDeleteProcessorForTest(String name, CoreStorageClient client, int almostMaxQueueSize,
+        int numDeleterThreads, int defaultMaxDeleteAttempts, long fixedRetryDelay, List<BlobDeleterTask> enqueuedTasks) {
+      super(name, client, almostMaxQueueSize, numDeleterThreads, defaultMaxDeleteAttempts, fixedRetryDelay);
+      this.enqueuedTasks = enqueuedTasks;
+    }
+    
+    @Override
+    protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+      enqueuedTasks.add(task);
+      return super.enqueue(task, isRetry);
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java
new file mode 100644
index 0000000..179b5d7
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.metadata.DeleteBlobStrategyTest;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests around the deletion of files from shared storage via background deletion
+ * processes as triggered by normal indexing and collection api calls
+ */
+public class SharedStoreDeletionProcessTest extends SolrCloudSharedStoreTestCase {
+  
+  private static String DEFAULT_PROCESSOR_NAME = "DeleterForTest";
+  private static Path sharedStoreRootPath;
+  
+  private List<CompletableFuture<BlobDeleterTaskResult>> taskFutures;
+  private List<CompletableFuture<BlobDeleterTaskResult>> overseerTaskFutures;
+  
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    sharedStoreRootPath = createTempDir("tempDir");    
+  }
+  
+  @Before
+  public void setupTest() {
+    taskFutures = new LinkedList<>();
+    overseerTaskFutures = new LinkedList<>();
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    // clean up the shared store after each test. The temp dir should clean up itself after the
+    // test class finishes
+    FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
+  }
+ 
+  /**
+   * Test that verifies that files marked for deletion during the indexing process get
+   * enqueued for deletion and deleted. This test case differs from {@link DeleteBlobStrategyTest}
+   * because it tests the end-to-end indexing flow with a {@link MiniSolrCloudCluster}
+   */
+  @Test
+  public void testIndexingTriggersDeletes() throws Exception {
+    setupCluster(1);
+    setupSolrNodes();
+    JettySolrRunner node = cluster.getJettySolrRunner(0);
+    int numThreads = 5;
+    int targetMaxAttempts = 5;
+    // files don't need to age before marked for deletion, deleted as indexed in this test
+    int delay = 0;
+    initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+        taskFutures, overseerTaskFutures);
+    
+    // set up the collection
+    String collectionName = "SharedCollection";
+    int maxShardsPerNode = 1;
+    int numReplicas = 1;
+    String shardNames = "shard1";
+    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+    
+    // index and track deletions, commits are implicit in shared storage so we expect pushes to occur 
+    // per batch and new segment index files to be deleted per batch with the exception of the very first
+    sendIndexingBatch(collectionName, /*numDocs */ 1, /* docIdStart */ 0);
+    assertEquals(0, taskFutures.size());
+    
+    // next indexing batch should cause files to be added for deletion
+    sendIndexingBatch(collectionName, /*numDocs */ 1, /* docIdStart */ 1);
+    assertEquals(1, taskFutures.size());
+    
+    CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+    BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+    
+    // verify the files were deleted
+    CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+    assertTrue(result.isSuccess());
+    assertFilesDeleted(storageClient, result);
+  }
+  
+  /**
+   * Test that verifies that collection deletion command deletes all files for the given collection on the
+   * happy path
+   */
+  @Test
+  public void testDeleteCollectionCommand() throws Exception {
+    setupCluster(1);
+    setupSolrNodes();
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    JettySolrRunner node = cluster.getJettySolrRunner(0);
+    int numThreads = 5;
+    int targetMaxAttempts = 5;
+    // files don't need to age before marked for deletion, deleted as indexed in this test
+    int delay = 0;
+    initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+        taskFutures, overseerTaskFutures);
+    
+    // set up the collection
+    String collectionName = "SharedCollection";
+    int maxShardsPerNode = 10;
+    int numReplicas = 1;
+    String shardNames = "shard1,shard2";
+    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+    
+    // index a bunch of docs
+    for (int i = 0; i < 10; i++) {
+      int numDocs = 100;
+      sendIndexingBatch(collectionName, numDocs, i*numDocs);
+    }
+    assertTrue(taskFutures.size() > 0);
+    assertEquals(0, overseerTaskFutures.size());
+    
+    // do collection deletion
+    CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collectionName);
+    delete.process(cloudClient).getResponse();
+    assertEquals(1, overseerTaskFutures.size());
+    
+    // wait for the deletion command to complete
+    CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+    BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+    
+    // the collection should no longer exist on zookeeper
+    cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
+    assertNull(cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collectionName));
+    
+    // verify the files in the deletion tasks were all deleted
+    CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+    assertTrue(result.isSuccess());
+    assertFilesDeleted(storageClient, result);
+    
+    // verify no files belonging to this collection exists on shared storage
+    List<String> names = storageClient.listCoreBlobFiles(collectionName);
+    assertTrue(names.isEmpty());
+  }
+  
+  /**
+   * Test that verifies that shard deletion command deletes all files for the given shard on the
+   * happy path
+   */
+  @Test
+  public void testDeleteShardCommand() throws Exception {
+    setupCluster(1);
+    setupSolrNodes();
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    JettySolrRunner node = cluster.getJettySolrRunner(0);
+    int numThreads = 5;
+    int targetMaxAttempts = 5;
+    // files don't need to age before marked for deletion, deleted as indexed in this test
+    int delay = 0;
+    initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+        taskFutures, overseerTaskFutures);
+    
+    // set up the collection
+    String collectionName = "SharedCollection";
+    int maxShardsPerNode = 10;
+    int numReplicas = 1;
+    String shardNames = "shard1";
+    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+    
+    // index a bunch of docs
+    for (int i = 0; i < 10; i++) {
+      int numDocs = 100;
+      sendIndexingBatch(collectionName, numDocs, i*numDocs);
+    }
+    assertTrue(taskFutures.size() > 0);
+    assertEquals(0, overseerTaskFutures.size());
+    
+    // split the shard so the parents are set inactive and can be deleted 
+    CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
+        .setShardName("shard1");
+    splitShard.process(cloudClient);
+    waitForState("Timed out waiting for sub shards to be active.",
+        collectionName, activeClusterShape(2, 3));
+    
+    // do shard deletion on the parent
+    CollectionAdminRequest.DeleteShard delete = CollectionAdminRequest.deleteShard(collectionName, "shard1");
+    delete.process(cloudClient).getResponse();
+    assertEquals(1, overseerTaskFutures.size());
+    
+    // wait for the deletion command to complete
+    CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+    BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+    
+    // the collection shard should no longer exist on zookeeper
+    cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
+    DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collectionName); 
+    assertNotNull(coll);
+    assertNull(coll.getSlice("shard1"));
+    
+    // verify the files in the deletion tasks were all deleted
+    CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+    assertTrue(result.isSuccess());
+    assertFilesDeleted(storageClient, result);
+    
+    // verify no files belonging to shard1 in the collection exists on shared storage
+    List<String> names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1/"));
+    assertTrue(names.isEmpty());
+    
+    // verify files belonging to shard1_0 and shard1_1 exist still
+    names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1_0"));
+    assertFalse(names.isEmpty());
+    names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1_1"));
+    assertFalse(names.isEmpty());
+  }
+  
+  private void assertFilesDeleted(CoreStorageClient storageClient, BlobDeleterTaskResult result) throws IOException {
+    Collection<String> filesDeleted = result.getFilesDeleted();
+    for (String filename : filesDeleted) {
+      InputStream s = null;
+      try  {
+        s = storageClient.pullStream(filename);
+        fail(filename + " should have been deleted from shared store");
+      } catch (Exception ex) {
+        if (!(ex.getCause() instanceof FileNotFoundException)) {
+          fail("Unexpected exception thrown = " + ex.getMessage());
+        }
+      } finally {
+        if (s != null) {
+          s.close();
+        }
+      }
+    }
+  }
+  
+  private void sendIndexingBatch(String collectionName, int numberOfDocs, int docIdStart) throws SolrServerException, IOException {
+    UpdateRequest updateReq = new UpdateRequest();
+    for (int k = docIdStart; k < docIdStart + numberOfDocs; k++) {
+      updateReq.add("id", Integer.toString(k));
+    }
+    updateReq.process(cluster.getSolrClient(), collectionName);
+  }
+  
+  private BlobDeleteManager initiateDeleteManagerForTest(JettySolrRunner solrRunner, 
+      int almostMaxQueueSize, int numDeleterThreads, int deleteDelayMs, List<CompletableFuture<BlobDeleterTaskResult>> taskFutures,
+      List<CompletableFuture<BlobDeleterTaskResult>> overseerTaskFutures) {
+    CoreStorageClient client = solrRunner.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+    int maxQueueSize = 200;
+    int numThreads = 5;
+    int defaultMaxAttempts = 5;
+    int retryDelay = 500; 
+    
+    // setup processors with the same defaults but enhanced to capture future results
+    BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(taskFutures, client,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay);
+    BlobDeleteProcessor overseerProcessor = buildBlobDeleteProcessorForTest(overseerTaskFutures, client,
+        maxQueueSize, numThreads, defaultMaxAttempts, retryDelay);
+    
+    BlobDeleteManager deleteManager = new BlobDeleteManager(client, deleteDelayMs, processor, overseerProcessor);
+    setupBlobDeleteManagerForNode(deleteManager, solrRunner);
+    return deleteManager;
+  }
+  
+  private void setupSolrNodes() throws Exception {
+    for (JettySolrRunner process : cluster.getJettySolrRunners()) {
+      CoreStorageClient storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
+      setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), process);
+    }
+  }
+  
+  // enables capturing all enqueues to the executor pool, including retries
+  private BlobDeleteProcessor buildBlobDeleteProcessorForTest(List<CompletableFuture<BlobDeleterTaskResult>> taskFutures, 
+      CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, int defaultMaxDeleteAttempts, 
+      long fixedRetryDelay) {
+    return new BlobDeleteProcessorForTest(DEFAULT_PROCESSOR_NAME, client, almostMaxQueueSize, numDeleterThreads,
+        defaultMaxDeleteAttempts, fixedRetryDelay, taskFutures);
+  }
+  
+  /**
+   * Test class extending BlobDeleteProcessor to allow capturing task futures
+   */
+  class BlobDeleteProcessorForTest extends BlobDeleteProcessor {
+    List<CompletableFuture<BlobDeleterTaskResult>> futures;
+    
+    public BlobDeleteProcessorForTest(String name, CoreStorageClient client, int almostMaxQueueSize,
+        int numDeleterThreads, int defaultMaxDeleteAttempts, long fixedRetryDelay, 
+        List<CompletableFuture<BlobDeleterTaskResult>> taskFutures) {
+      super(name, client, almostMaxQueueSize, numDeleterThreads, defaultMaxDeleteAttempts, fixedRetryDelay);
+      this.futures = taskFutures;
+    }
+    
+    @Override
+    protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+      CompletableFuture<BlobDeleterTaskResult> futureRes = super.enqueue(task, isRetry);
+      futures.add(futureRes);
+      return futureRes;
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
index c22f24f..909443c 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
@@ -28,11 +28,12 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.client.LocalStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
 import org.apache.solr.store.blob.process.BlobProcessUtil;
 import org.apache.solr.store.blob.process.CorePullTask;
+import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
 import org.apache.solr.store.blob.process.CorePullerFeeder;
 import org.apache.solr.store.blob.process.CoreSyncStatus;
-import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
 import org.apache.solr.store.blob.provider.BlobStorageProvider;
 
 /**
@@ -100,6 +101,14 @@ public class SolrCloudSharedStoreTestCase extends SolrCloudTestCase {
     SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
     manager.initConcurrencyController(concurrencyController);
   }
+  
+  /**
+   * Configures the Solr process with the given {@link BlobDeleteManager}
+   */
+  protected static void setupBlobDeleteManagerForNode(BlobDeleteManager deleteManager, JettySolrRunner solrRunner) {
+    SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
+    manager.initBlobDeleteManager(deleteManager);
+  }
 
   /**
    * Return a new CoreStorageClient that writes to the specified sharedStoreRootPath and blobDirectoryName