You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2020/02/06 02:21:07 UTC
[lucene-solr] branch jira/SOLR-13101 updated: SOLR-14044: Support
collection and shard deletion in shared storage (#1188)
This is an automated email from the ASF dual-hosted git repository.
yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/SOLR-13101 by this push:
new 3e8ca67 SOLR-14044: Support collection and shard deletion in shared storage (#1188)
3e8ca67 is described below
commit 3e8ca67f6f5d87b103148824e2bf158d5d5330da
Author: Andy Vuong <an...@users.noreply.github.com>
AuthorDate: Wed Feb 5 18:20:56 2020 -0800
SOLR-14044: Support collection and shard deletion in shared storage (#1188)
* Support collection and shard deletion in shared storage
* Add end to end collection api delete tests and fix local client test
* Fix timestamps
* Remove debug log line and fix timestamps
* Address review comments and fix test
* Close resource and throw exception on failure
---
.../cloud/api/collections/DeleteCollectionCmd.java | 45 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 36 +-
.../solr/store/blob/client/CoreStorageClient.java | 5 +
.../solr/store/blob/client/LocalStorageClient.java | 25 ++
.../solr/store/blob/client/S3StorageClient.java | 22 +
.../solr/store/blob/metadata/CorePushPull.java | 16 +-
.../solr/store/blob/process/BlobDeleteManager.java | 193 +++++----
.../store/blob/process/BlobDeleteProcessor.java | 176 ++++++++
.../solr/store/blob/process/BlobDeleterTask.java | 276 +++++++++---
.../solr/store/shared/SharedStoreManager.java | 11 +-
.../store/blob/client/CoreStorageClientTest.java | 19 +
.../blob/process/BlobDeleteProcessorTest.java | 464 +++++++++++++++++++++
.../process/SharedStoreDeletionProcessTest.java | 329 +++++++++++++++
.../store/shared/SolrCloudSharedStoreTestCase.java | 11 +-
14 files changed, 1465 insertions(+), 163 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 648f5ba..4eee49b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -18,6 +18,13 @@
package org.apache.solr.cloud.api.collections;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
@@ -25,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -47,17 +55,14 @@ import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.handler.admin.MetricsHistoryHandler;
import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SharedStoreManager;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
@@ -142,6 +147,32 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
break;
}
}
+
+ // Delete the collection files from shared store. We want to delete all of the files before we delete
+ // the collection state from ZooKeeper.
+ DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
+ if (docCollection != null && docCollection.getSharedIndex()) {
+ SharedStoreManager sharedStoreManager = ocmh.overseer.getCoreContainer().getSharedStoreManager();
+ BlobDeleteManager deleteManager = sharedStoreManager.getBlobDeleteManager();
+ BlobDeleteProcessor deleteProcessor = deleteManager.getOverseerDeleteProcessor();
+ // deletes all files belonging to this collection
+ CompletableFuture<BlobDeleterTaskResult> deleteFuture =
+ deleteProcessor.deleteCollection(collection, false);
+
+ BlobDeleterTaskResult result = null;
+ Throwable t = null;
+ try {
+ // TODO: Find a reasonable timeout value
+ result = deleteFuture.get(60, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ t = ex;
+ }
+ if (t != null || !result.isSuccess()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not complete deleting collection" +
+ collection + " from shared store, files belonging to this collection"
+ + " may be orphaned.", t);
+ }
+ }
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index ce9f29d..692b1f5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -31,13 +31,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -47,6 +50,10 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SharedStoreManager;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +82,8 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
log.info("Delete shard invoked");
- Slice slice = clusterState.getCollection(collectionName).getSlice(sliceId);
+ DocCollection docCollection = clusterState.getCollection(collectionName);
+ Slice slice = docCollection.getSlice(sliceId);
if (slice == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"No shard with name " + sliceId + " exists for collection " + collectionName);
@@ -136,7 +144,31 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
log.debug("Waiting for delete shard action to complete");
cleanupLatch.await(1, TimeUnit.MINUTES);
- ocmh.overseer.getShardSharedMetadataController().cleanUpMetadataNodes(collectionName, slice.getName());
+ if (docCollection != null && docCollection.getSharedIndex()) {
+ ocmh.overseer.getShardSharedMetadataController().cleanUpMetadataNodes(collectionName, slice.getName());
+
+ SharedStoreManager sharedStoreManager = ocmh.overseer.getCoreContainer().getSharedStoreManager();
+ BlobDeleteManager deleteManager = sharedStoreManager.getBlobDeleteManager();
+ BlobDeleteProcessor deleteProcessor = deleteManager.getOverseerDeleteProcessor();
+
+ String sharedShardName = (String) slice.get(ZkStateReader.SHARED_SHARD_NAME);
+ CompletableFuture<BlobDeleterTaskResult> deleteFuture =
+ deleteProcessor.deleteShard(collectionName, sharedShardName, false);
+
+ BlobDeleterTaskResult result = null;
+ Throwable t = null;
+ try {
+ // TODO: Find a reasonable timeout value
+ result = deleteFuture.get(60, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ t = ex;
+ }
+ if (t != null || !result.isSuccess()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not complete deleting shard" +
+ slice.getName() + " from shared store belonging to collection " + collectionName +
+ ". Files belonging to this shard may be orphaned.", t);
+ }
+ }
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
index 901b836..9a4872c 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
@@ -136,6 +136,11 @@ public interface CoreStorageClient {
public void shutdown();
/**
+ * Lists all file names with the given prefix
+ */
+ public List<String> listCoreBlobFiles(String prefix) throws BlobException;
+
+ /**
* Lists the blob file names of all of files listed under a given core name's blob store
* hierarchy that are older than the given timestamp value in milliseconds. Important to
* note that that the wall clock of your caller will vary with that of the blob store service
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
index a25e42b..4f8130a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/LocalStorageClient.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -33,6 +34,8 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class that handles reads and writes of solr blob files to the local file system.
@@ -237,6 +240,28 @@ public class LocalStorageClient implements CoreStorageClient {
public void shutdown() {
}
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ @Override
+ public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+ try {
+ String rootBlobDir = getBlobAbsolutePath("");
+ Path path = Paths.get(rootBlobDir);
+ List<String> blobFiles =
+ Files.walk(path).map(Path::toFile)
+ .filter(file -> (!file.isDirectory()))
+ .map(file -> {
+ // extracts just the file system blob file name without the root dir
+ return file.getAbsolutePath().substring(rootBlobDir.length());
+ })
+ .filter(name -> name.startsWith(prefix))
+ .distinct()
+ .collect(Collectors.toList());
+ return blobFiles;
+ } catch (Exception ex) {
+ throw new BlobException(ex);
+ }
+ }
@Override
public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
index c981b96..dc40697 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java
@@ -270,6 +270,28 @@ public class S3StorageClient implements CoreStorageClient {
public void shutdown() {
s3Client.shutdown();
}
+
+ @Override
+ public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+ ListObjectsRequest listRequest = new ListObjectsRequest();
+ listRequest.setBucketName(blobBucketName);
+ listRequest.setPrefix(prefix);
+
+ List<String> blobFiles = new LinkedList<>();
+ try {
+ ObjectListing objectListing = s3Client.listObjects(listRequest);
+ iterateObjectListingAndConsume(objectListing, object -> {
+ blobFiles.add(object.getKey());
+ });
+ return blobFiles;
+ } catch (AmazonServiceException ase) {
+ throw handleAmazonServiceException(ase);
+ } catch (AmazonClientException ace) {
+ throw new BlobClientException(ace);
+ } catch (Exception ex) {
+ throw new BlobException(ex);
+ }
+ }
@Override
public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
index c659b9b..ff362e2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.Directory;
@@ -49,11 +48,14 @@ import org.apache.solr.store.blob.client.ToFromJson;
import org.apache.solr.store.blob.metadata.ServerSideMetadata.CoreFileData;
import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.blob.process.BlobDeleteProcessor;
import org.apache.solr.store.blob.util.BlobStoreUtils;
import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Class pushing updates from the local core to the Blob Store and pulling updates from Blob store to local core.
* This class knows about the Solr core directory structure and can translate it into file system abstractions since that's
@@ -510,6 +512,9 @@ public class CorePushPull {
return BlobStoreUtils.getCurrentTimeMs() - file.getDeletedAt() >= deleteManager.getDeleteDelayMs();
}
+ /**
+ * @return true if the files were enqueued for deletion successfully
+ */
@VisibleForTesting
protected boolean enqueueForDelete(String coreName, Set<BlobCoreMetadata.BlobFileToDelete> blobFiles) {
if (blobFiles == null || blobFiles.isEmpty()) {
@@ -518,7 +523,14 @@ public class CorePushPull {
Set<String> blobNames = blobFiles.stream()
.map(blobFile -> blobFile.getBlobName())
.collect(Collectors.toCollection(HashSet::new));
- return deleteManager.enqueueForDelete(coreName, blobNames);
+
+ BlobDeleteProcessor deleteProcessor = deleteManager.getDeleteProcessor();
+ try {
+ deleteProcessor.deleteFiles(coreName, blobNames, true);
+ return true;
+ } catch (Exception ex) {
+ return false;
+ }
}
/**
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
index f91680f..81ebd9e 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
@@ -18,40 +18,74 @@
package org.apache.solr.store.blob.process;
import java.lang.invoke.MethodHandles;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.lucene.util.NamedThreadFactory;
-import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.store.blob.client.CoreStorageClient;
import org.apache.solr.store.blob.metadata.CorePushPull;
+import org.apache.solr.store.blob.metadata.ServerSideMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
- * Manager of blobs (files) to delete, putting them in a queue (if space left on the queue) then consumed and processed
- * by {@link BlobDeleterTask}
+ * This class manages the deletion machinery required by shared storage enabled collections. Its responsibilities
+ * include the allocation and management of bounded deletion task queues and their consumers.
+ *
+ * Deletion of blob files from shared store happen on two paths:
+ * 1. In the indexing path, the local {@link SolrCore}'s index files represented by an instance of a
+ * {@link ServerSideMetadata} object is resolved against the blob store's core.metadata file, or the
+ * the source of truth for what index files a {@link SolrCore} should have. As the difference between
+ * these two metadata instances are resolved, we add files to be deleted to the BlobDeleteManager which
+ * enqueues a {@link BlobDeleterTask} for asynchronous processing.
+ * 2. In the collection admin API, we may delete a collection or collection shard. In the former, all index
+ * files belonging to the specified collection on shared storage should be deleted while in the latter
+ * all index files belonging to a particular collection/shard pair should be deleted.
+ *
+ * Shard leaders are the only replicas receiving indexing traffic and pushing to shared store in a shared collection
+ * so all Solr nodes in a cluster may be sending deletion requests to the shared storage provider at a given moment.
+ * Collection commands are only processed by the Overseer and therefore only the Overseer should be deleting entire
+ * collections or shard files from shared storage.
+ *
+ * The BlobDeleteManager maintains two queues to prevent any potential starvation, one for the incremental indexing
+ * deletion path that is always initiated when a Solr node with shared collections starts up and one that is only
+ * used when the current node is Overseer and handles Overseer specific actions.
*/
public class BlobDeleteManager {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
+ * Identifier for a BlobDeleteProcessor that runs on all Solr nodes containing shared collections
+ */
+ public static final String BLOB_FILE_DELETER = "BlobFileDeleter";
+
+ /**
+ * Identifier for a BlobDeleteProcessor that runs on the Overseer if the Solr cluster contains
+ * any shared collection
+ */
+ public static final String OVERSEER_BLOB_FILE_DELETER = "OverseerBlobFileDeleter";
+
+ /**
* Limit to the number of blob files to delete accepted on the delete queue (and lost in case of server crash). When
* the queue reaches that size, no more deletes are accepted (will be retried later for a core, next time it is pushed).
- * (note that tests in searchserver.blobstore.metadata.CorePushTest trigger a merge that enqueues more than 100 files to
- * be deleted. If that constant is reduced to 100 for example, some enqueues in the test will fail and there will be
- * files left to be deleted where we've expected none. So don't reduce it too much :)
*/
- private static final int ALMOST_MAX_DELETER_QUEUE_SIZE = 200;
+ private static final int DEFAULT_ALMOST_MAX_DELETER_QUEUE_SIZE = 200;
+
+ private static final int DEFAULT_THREAD_POOL_SIZE = 5;
- private final CoreStorageClient client;
- private final RejecterThreadPoolExecutor deleteExecutor;
+ // TODO: 30 seconds is currently chosen arbitrarily and these values should be based as configurable values
+ private static final int DEFAULT_DELETE_DELAY_MS = 30000;
+
+ private static int MAX_DELETE_ATTEMPTS = 50;
+ private static long SLEEP_MS_FAILED_ATTEMPT = TimeUnit.SECONDS.toMillis(10);
+
+ private final CoreStorageClient storageClient;
+
+ private final BlobDeleteProcessor deleteProcessor;
+ private final BlobDeleteProcessor overseerDeleteProcessor;
/**
* After a core push has marked a file as deleted, wait at least this long before actually deleting its blob from the
@@ -71,92 +105,81 @@ public class BlobDeleteManager {
* delete until we know for sure the file can be resuscitated...
*/
private final long deleteDelayMs;
+
+ private AtomicBoolean isShutdown;
/**
- * TODO : Creates a default delete client, should have config based one
+ * Creates a new BlobDeleteManager with the provided {@link CoreStorageClient} and instantiates
+ * it with a default deleteDelayMs, queue size, and thread pool size. A default {@link BlobDeleteProcessor}
+ * is also created.
*/
public BlobDeleteManager(CoreStorageClient client) {
- // 30 seconds
- this(client, ALMOST_MAX_DELETER_QUEUE_SIZE, 5, 30000);
+ this(client, DEFAULT_ALMOST_MAX_DELETER_QUEUE_SIZE, DEFAULT_THREAD_POOL_SIZE, DEFAULT_DELETE_DELAY_MS);
}
+ /**
+ * Creates a new BlobDeleteManager with the specified BlobDeleteProcessors.
+ */
+ @VisibleForTesting
+ public BlobDeleteManager(CoreStorageClient client, long deleteDelayMs, BlobDeleteProcessor deleteProcessor,
+ BlobDeleteProcessor overseerDeleteProcessor) {
+ this.deleteDelayMs = deleteDelayMs;
+ this.storageClient = client;
+ this.isShutdown = new AtomicBoolean(false);
+ this.deleteProcessor = deleteProcessor;
+ this.overseerDeleteProcessor = overseerDeleteProcessor;
+ }
+
+ /**
+ * Creates a new BlobDeleteManager and default {@link BlobDeleteProcessor}.
+ */
public BlobDeleteManager(CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, long deleteDelayMs) {
- NamedThreadFactory threadFactory = new NamedThreadFactory("BlobFileDeleter");
-
- // Note this queue MUST NOT BE BOUNDED, or we risk deadlocks given that BlobDeleterTask's reenqueue themselves upon failure
- BlockingQueue<Runnable> deleteQueue = new LinkedBlockingDeque<>();
-
- deleteExecutor = new RejecterThreadPoolExecutor(numDeleterThreads, deleteQueue, almostMaxQueueSize, threadFactory);
-
this.deleteDelayMs = deleteDelayMs;
- this.client = client;
+ this.storageClient = client;
+ this.isShutdown = new AtomicBoolean(false);
+
+ deleteProcessor = new BlobDeleteProcessor(BLOB_FILE_DELETER, storageClient, almostMaxQueueSize, numDeleterThreads,
+ MAX_DELETE_ATTEMPTS, SLEEP_MS_FAILED_ATTEMPT);
+
+ // Non-Overseer nodes will initiate a delete processor but the underlying pool will sit idle
+ // until the node is elected and tasks are added. The overhead should be small.
+ overseerDeleteProcessor = new BlobDeleteProcessor(OVERSEER_BLOB_FILE_DELETER, storageClient, almostMaxQueueSize, numDeleterThreads,
+ MAX_DELETE_ATTEMPTS, SLEEP_MS_FAILED_ATTEMPT);
}
-
- public void shutdown() {
- deleteExecutor.shutdown();
+
+ /**
+ * Returns a delete processor for normal indexing flow shared store deletion
+ */
+ public BlobDeleteProcessor getDeleteProcessor() {
+ return deleteProcessor;
}
-
- public long getDeleteDelayMs() {
- return deleteDelayMs;
+
+ /**
+ * Returns a delete processor specifically allocated for Overseer use
+ */
+ public BlobDeleteProcessor getOverseerDeleteProcessor() {
+ return overseerDeleteProcessor;
}
/**
- * This method is called 'externally" (i.e. not by tasks needing to reenqueue) and enq
- * @return <code>true</code> if the delete was enqueued, <code>false</code> if can't be enqueued (deleter turned off
- * by config or current queue of blobs file deletes too full).
+ * Shuts down all {@link BlobDeleteProcessor} currently managed by the BlobDeleteManager
*/
- public boolean enqueueForDelete(String sharedBlobName, Set<String> blobNames) {
- BlobDeleterTask command = new BlobDeleterTask(client, sharedBlobName, blobNames, deleteExecutor);
- return deleteExecutor.executeIfPossible(command);
+ public void shutdown() {
+ isShutdown.set(true);
+ log.info("BlobDeleteManager is shutting down");
+
+ deleteProcessor.close();
+ log.info("BlobDeleteProcessor " + deleteProcessor.getName() + " has shutdown");
+ overseerDeleteProcessor.close();
+ log.info("BlobDeleteProcessor " + overseerDeleteProcessor.getName() + " has shutdown");
}
/**
- * Subclass of {@link ThreadPoolExecutor} that has an additional command enqueue method {@link #executeIfPossible(Runnable)}
- * that rejects the enqueue if the underlying queue is over a configured size.<p>
- * The created thread pool executor has a fixed number of threads because the undelying blocking queue is unbounded.
+ * Get the delay that a file being deleted via the indexing flow must wait before it is
+ * eligible for deletion from shared storage. See {@link BlobDeleteManager#deleteDelayMs} for more
+ * details
*/
- private class RejecterThreadPoolExecutor extends MDCAwareThreadPoolExecutor {
- private final int targetMaxQueueSize;
- /**
- * @param poolSize the number of threads to keep in the pool. There is a fixed number of threads in that pool,
- * because a {@link ThreadPoolExecutor} using an unbounded queue will not have the pool create more threads
- * than the core pool size, even tasks are slow to execute.
- * @param workQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param targetMaxQueueSize max queue size to accept enqueues through {@link #executeIfPossible(Runnable)} but having
- * no impact on enqueues through {@link #execute(Runnable)}.
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- */
- RejecterThreadPoolExecutor(int poolSize,
- BlockingQueue<Runnable> workQueue,
- int targetMaxQueueSize,
- ThreadFactory threadFactory) {
- super(poolSize, poolSize, 0L, TimeUnit.SECONDS, workQueue, threadFactory);
- this.targetMaxQueueSize = targetMaxQueueSize;
- }
-
- /**
- * Enqueues the passed <code>command</code> for execution just like a call to superclass' {@link ThreadPoolExecutor#execute(Runnable)}
- * if the work queue is not too full (below or at size <code>targetMaxQueueSize</code> passed in the constructor).
- * @param command the task to execute
- * @return <code>true</code> if the <code>command</code> was accepted and enqueued for execution (or executed),
- * <code>false</code> if the underlying queue was too full and the <code>command</code> was not enqueued for execution
- * (i.e. nothing happened).
- */
- boolean executeIfPossible(Runnable command) {
- if (getQueue().size() > targetMaxQueueSize) {
- return false;
- }
-
- try {
- execute(command);
- } catch (RejectedExecutionException ree) {
- // pool might be shutting down
- return false;
- }
- return true;
- }
+ public long getDeleteDelayMs() {
+ return deleteDelayMs;
}
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java
new file mode 100644
index 0000000..f15fbb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteProcessor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
+import org.apache.solr.store.blob.client.BlobClientUtils;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobFileDeletionTask;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobPrefixedFileDeletionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A generic deletion processor used for deleting object files from shared
+ * storage. Each processor manages its own task bounded thread pool for processing
+ * {@link BlobDeleterTask} asynchronously. Processors support retrying tasks if
+ * necessary but retry decisions are left to the individual task implementations.
+ *
+ * Instances of {@link BlobDeleteProcessor} are managed by the {@link BlobDeleteManager}.
+ */
+public class BlobDeleteProcessor implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final String name;
+ private final int almostMaxQueueSize;
+ /**
+ * Note we sleep() after each failed attempt, so multiply this value by {@link #fixedRetryDelay} to find
+ * out how long we'll retry (at least) if Blob access fails for some reason ("at least" because we
+ * re-enqueue at the tail of the queue ({@link BlobDeleteManager} creates a list), so there might be additional
+ * processing delay if the queue is not empty and is processed before the enqueued retry is processed).
+ */
+ private final int maxDeleteAttempts;
+ private final long fixedRetryDelay;
+ private final CoreStorageClient client;
+ private final BlockingQueue<Runnable> deleteQueue;
+ private final MDCAwareThreadPoolExecutor deleteExecutor;
+
+ /**
+ * @param name identifying the processor
+ * @param client the storage client to use in this processor
+ * @param almostMaxQueueSize the target max queue size
+ * @param numDeleterThreads the number of threads to configure in the underlying thread pool
+ * @param defaultMaxDeleteAttempts maximum number of attempts to retry any task enqueued in this processor
+ * @param fixedRetryDelay fixed time delay in ms between retry attempts
+ */
+ public BlobDeleteProcessor(String name, CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads,
+ int defaultMaxDeleteAttempts, long fixedRetryDelay) {
+ this.name = name;
+ this.almostMaxQueueSize = almostMaxQueueSize;
+ this.maxDeleteAttempts = defaultMaxDeleteAttempts;
+ this.fixedRetryDelay = fixedRetryDelay;
+ NamedThreadFactory threadFactory = new NamedThreadFactory(name);
+
+ // Note this queue MUST NOT BE BOUNDED, or we risk deadlocks given that BlobDeleterTask's
+ // re-enqueue themselves upon failure
+ deleteQueue = new LinkedBlockingDeque<>();
+
+ deleteExecutor = new MDCAwareThreadPoolExecutor(numDeleterThreads, numDeleterThreads, 0L, TimeUnit.SECONDS, deleteQueue, threadFactory);
+ this.client = client;
+ }
+
+ /**
+ * Enqueues the given set of files for deletion from shared store as an async task.
+ *
+ * @param collectionName the name of the collection the files belong to
+ * @param blobNames list of file paths to delete from shared store
+ * @param allowRetry flag indicating if the task should be retried if it fails
+ */
+ public CompletableFuture<BlobDeleterTaskResult> deleteFiles(String collectionName, Set<String> blobNames, boolean allowRetry) {
+ BlobDeleterTask task = new BlobFileDeletionTask(client, collectionName, blobNames,
+ allowRetry, maxDeleteAttempts);
+ return enqueue(task, false);
+ }
+
+ /**
+ * Enqueues a task to delete all files belonging to the given collection from shared store as an async task.
+ *
+ * @param collectionName the name of the collection to be deleted from shared store
+ * @param allowRetry flag indicating if the task should be retried if it fails
+ */
+ public CompletableFuture<BlobDeleterTaskResult> deleteCollection(String collectionName, boolean allowRetry) {
+ BlobDeleterTask task = new BlobPrefixedFileDeletionTask(client, collectionName, collectionName,
+ allowRetry, maxDeleteAttempts);
+ return enqueue(task, false);
+ }
+
+ /**
+ * Enqueues a task to delete all files belonging to the given collection and {@link ZkStateReader#SHARED_SHARD_NAME}
+ * from shared store as an async task.
+ *
+ * @param collectionName the name of the collection the files belong to
+ * @param sharedShardName the identifier for the shardId located on the shared store
+ * @param allowRetry flag indicating if the task should be retried if it fails
+ */
+ public CompletableFuture<BlobDeleterTaskResult> deleteShard(String collectionName, String sharedShardName, boolean allowRetry) {
+ BlobDeleterTask task = new BlobPrefixedFileDeletionTask(client, collectionName,
+ sharedShardName + BlobClientUtils.BLOB_FILE_PATH_DELIMITER, allowRetry, maxDeleteAttempts);
+ return enqueue(task, false);
+ }
+
+ /**
+ * Enqueues a task to be processed by a thread in the {@link BlobDeleteProcessor#deleteExecutor} thread
+ * pool. The callback is handled by the same execution thread and will re-enqueue a task that has failed
+ * and should be retried. Tasks that are enqueued via the retry mechanism are not bound by the same size
+ * constraints as newly minted tasks are.
+ *
+ * @returns CompletableFuture to allow calling threads the capability to block on the
+ * computation results as needed, retrieved suppressed exceptions in retry, etc
+ */
+ @VisibleForTesting
+ protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+ if (!isRetry && deleteQueue.size() > almostMaxQueueSize) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to enqueue deletion task: " + task.toString());
+ }
+
+ return CompletableFuture.supplyAsync(() -> {
+ return task.call();
+ }, deleteExecutor)
+ .thenCompose(result -> {
+ // the callback will execute on the same thread as the executing task
+ if (!result.isSuccess() && result.shouldRetry()) {
+ try {
+ // Some delay before retry... (could move this delay to before trying to delete a file that previously
+ // failed to be deleted, that way if the queue is busy and it took time to retry, we don't add an additional
+ // delay on top of that. On the other hand, an exception here could be an issue with the Blob store
+ // itself and nothing specific to the file at hand, so slowing all delete attempts for all files might
+ // make sense.
+ Thread.sleep(fixedRetryDelay);
+ return enqueue(result.getTask(), result.shouldRetry());
+ } catch (Exception ex) {
+ log.error("Could not re-enqueue failed deleter task that should have been enqueued!", ex);
+ }
+ }
+ return CompletableFuture.completedFuture(result);
+ });
+ }
+
+ @Override
+ public void close() {
+ deleteExecutor.shutdown();
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
index c2eaa53..e3fdecd 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleterTask.java
@@ -18,95 +18,245 @@
package org.apache.solr.store.blob.process;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Locale;
import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
import org.apache.solr.store.blob.util.BlobStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Task in charge of deleting Blobs (files) from blob store.
+ * Generic deletion task for files located on shared storage
*/
-class BlobDeleterTask implements Runnable {
+public abstract class BlobDeleterTask implements Callable<BlobDeleterTaskResult> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- /**
- * Note we sleep() after each failed attempt, so multiply this value by {@link #SLEEP_MS_FAILED_ATTEMPT} to find
- * out how long we'll retry (at least) if Blob access fails for some reason ("at least" because we
- * re-enqueue at the tail of the queue ({@link BlobDeleteManager} creates a list), so there might be additional
- * processing delay if the queue is not empty and is processed before the enqueued retry is processed).
- */
- private static int MAX_DELETE_ATTEMPTS = 50;
- private static long SLEEP_MS_FAILED_ATTEMPT = TimeUnit.SECONDS.toMillis(10);
-
+
private final CoreStorageClient client;
- private final String sharedBlobName;
- private final Set<String> blobNames;
+ private final String collectionName;
private final AtomicInteger attempt;
- private final ThreadPoolExecutor executor;
+
private final long queuedTimeMs;
+ private final int maxAttempts;
+ private final boolean allowRetry;
+ private Throwable err;
- BlobDeleterTask(CoreStorageClient client, String sharedBlobName, Set<String> blobNames, ThreadPoolExecutor executor) {
- this.client = client;
- this.sharedBlobName = sharedBlobName;
- this.blobNames = blobNames;
+ public BlobDeleterTask(CoreStorageClient client, String collectionName, boolean allowRetry,
+ int maxAttempts) {
+ this.client = client;
+ this.collectionName = collectionName;
this.attempt = new AtomicInteger(0);
- this.executor = executor;
this.queuedTimeMs = BlobStoreUtils.getCurrentTimeMs();
+ this.allowRetry = allowRetry;
+ this.maxAttempts = maxAttempts;
}
-
+
+ /**
+ * Performs a deletion action and request against the shared storage for the given collection
+ * and returns the list of file paths deleted
+ */
+ public abstract Collection<String> doDelete() throws Exception;
+
+ /**
+ * Return a String representing the action performed by the BlobDeleterTask for logging purposes
+ */
+ public abstract String getActionName();
+
@Override
- public void run() {
+ public BlobDeleterTaskResult call() {
+ List<String> filesDeleted = new LinkedList<>();
final long startTimeMs = BlobStoreUtils.getCurrentTimeMs();
boolean isSuccess = true;
-
+ boolean shouldRetry = false;
try {
+ filesDeleted.addAll(doDelete());
+ attempt.incrementAndGet();
+ return new BlobDeleterTaskResult(this, filesDeleted, isSuccess, shouldRetry, err);
+ } catch (Exception ex) {
+ if (err == null) {
+ err = ex;
+ } else {
+ err.addSuppressed(ex);
+ }
+ int attempts = attempt.incrementAndGet();
+ isSuccess = false;
+ log.warn("BlobDeleterTask failed on attempt=" + attempts + " collection=" + collectionName
+ + " task=" + toString(), ex);
+ if (allowRetry) {
+ if (attempts < maxAttempts) {
+ shouldRetry = true;
+ } else {
+ log.warn("Reached " + maxAttempts + " attempt limit for deletion task " + toString() +
+ ". This task won't be retried.");
+ }
+ }
+ } finally {
+ long now = System.nanoTime() / 1000000;
+ long runTime = now - startTimeMs;
+ long startLatency = now - this.queuedTimeMs;
+ log(getActionName(), collectionName, runTime, startLatency, isSuccess, getAdditionalLogMessage());
+ }
+ return new BlobDeleterTaskResult(this, filesDeleted, isSuccess, shouldRetry, err);
+ }
+
+ /**
+ * Override-able by deletion tasks to provide additional action specific logging
+ */
+ public String getAdditionalLogMessage() {
+ return "";
+ }
+
+ @Override
+ public String toString() {
+ return "collectionName=" + collectionName + " allowRetry=" + allowRetry +
+ " queuedTimeMs=" + queuedTimeMs + " attemptsTried=" + attempt.get();
+ }
+
+ public int getAttempts() {
+ return attempt.get();
+ }
+
+ public void log(String action, String collectionName, long runTime, long startLatency, boolean isSuccess,
+ String additionalMessage) {
+ String message = String.format(Locale.ROOT,
+ "action=%s storageProvider=%s bucketRegion=%s bucketName=%s, runTime=%s "
+ + "startLatency=%s attempt=%s isSuccess=%s %s",
+ action, client.getStorageProvider().name(), client.getBucketRegion(), client.getBucketName(),
+ runTime, startLatency, attempt.get(), isSuccess, additionalMessage);
+ log.info(message);
+ }
+
+ /**
+ * Represents the result of a deletion task
+ */
+ public static class BlobDeleterTaskResult {
+ private final BlobDeleterTask task;
+ private final Collection<String> filesDeleted;
+ private final boolean isSuccess;
+ private final boolean shouldRetry;
+ private final Throwable err;
+
+ public BlobDeleterTaskResult(BlobDeleterTask task, Collection<String> filesDeleted,
+ boolean isSuccess, boolean shouldRetry, Throwable errs) {
+ this.task = task;
+ this.filesDeleted = filesDeleted;
+ this.isSuccess = isSuccess;
+ this.shouldRetry = shouldRetry;
+ this.err = errs;
+ }
+
+ public boolean isSuccess() {
+ return isSuccess;
+ }
+
+ public boolean shouldRetry() {
+ return shouldRetry;
+ }
+
+ public BlobDeleterTask getTask() {
+ return task;
+ }
+
+ /**
+ * @return the files that are being deleted. Note if the task wasn't successful there is no guarantee
+ * all of these files were in fact deleted from shared storage
+ */
+ public Collection<String> getFilesDeleted() {
+ return filesDeleted;
+ }
+
+ public Throwable getError() {
+ return err;
+ }
+ }
+
+ /**
+ * A BlobDeleterTask that deletes a given set of blob files from shared store
+ */
+ public static class BlobFileDeletionTask extends BlobDeleterTask {
+ private final CoreStorageClient client;
+ private final Set<String> blobNames;
+
+ /**
+ * Constructor for BlobDeleterTask that deletes a given set of blob files from shared store
+ */
+ public BlobFileDeletionTask(CoreStorageClient client, String collectionName, Set<String> blobNames, boolean allowRetry,
+ int maxRetryAttempt) {
+ super(client, collectionName, allowRetry, maxRetryAttempt);
+ this.blobNames = blobNames;
+ this.client = client;
+ }
+
+ @Override
+ public Collection<String> doDelete() throws Exception {
client.deleteBlobs(blobNames);
- // Blob might not have been deleted if at some point we've enqueued files to delete while doing a core push,
- // but the push ended up failing and the core.metadata file was not updated. We ended up with the blobs enqueued for
- // delete and eventually removed by a BlobDeleterTask and the files to delete still present in core.metadata
- // so enqueued again.
- // Note it is ok to delete these files even if the core.metadata update fails. The delete is not linked
- // to the push activity, it is related to blobs marked for delete that can be safely removed after some delay has passed.
- } catch (Exception e) {
- isSuccess = false;
- int attempts = attempt.incrementAndGet();
+ return blobNames;
+ }
+
+ @Override
+ public String getActionName() {
+ return "DELETE";
+ }
+
+ @Override
+ public String getAdditionalLogMessage() {
+ return "filesAffected=" + blobNames.size();
+ }
+
+ @Override
+ public String toString() {
+ return "BlobFileDeletionTask action=" + getActionName() + " totalFilesSpecified=" + blobNames.size() +
+ " " + super.toString();
+ }
+ }
+
+ /**
+ * A BlobDeleterTask that deletes all files from shared storage with the given string prefix
+ */
+ public static class BlobPrefixedFileDeletionTask extends BlobDeleterTask {
+ private final CoreStorageClient client;
+ private final String prefix;
+
+ private int affectedFiles = 0;
+
+ /**
+ * Constructor for BlobDeleterTask that deletes all files from shared storage with the given string prefix
+ */
+ public BlobPrefixedFileDeletionTask(CoreStorageClient client, String collectionName, String prefix, boolean allowRetry,
+ int maxRetryAttempt) {
+ super(client, collectionName, allowRetry, maxRetryAttempt);
+ this.prefix = prefix;
+ this.client = client;
+ }
- log.warn("Blob file delete task failed."
- +" attempt=" + attempts + " sharedBlobName=" + this.sharedBlobName + " numOfBlobs=" + this.blobNames.size(), e);
+ @Override
+ public List<String> doDelete() throws Exception {
+ List<String> allFiles = client.listCoreBlobFiles(prefix);
+ affectedFiles = allFiles.size();
+ client.deleteBlobs(allFiles);
+ return allFiles;
+ }
- if (attempts < MAX_DELETE_ATTEMPTS) {
- // We failed, but we'll try again. Enqueue the task for a new delete attempt. attempt already increased.
- // Note this execute call accepts the
- try {
- // Some delay before retry... (could move this delay to before trying to delete a file that previously
- // failed to be deleted, that way if the queue is busy and it took time to retry, we don't add an additional
- // delay on top of that. On the other hand, an exception here could be an issue with the Blob store
- // itself and nothing specific to the file at hand, so slowing all delete attempts for all files might
- // make sense. Splunk will eventually tell us... or not.
- Thread.sleep(SLEEP_MS_FAILED_ATTEMPT);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- // This can throw an exception if the pool is shutting down.
- executor.execute(this);
- }
- } finally {
- long now = BlobStoreUtils.getCurrentTimeMs();
- long runTimeMs = now - startTimeMs;
- long startLatency = now - this.queuedTimeMs;
- String message = String.format(Locale.ROOT,
- "sharedBlobName=%s action=DELETE storageProvider=%s bucketRegion=%s bucketName=%s "
- + "runTime=%s startLatency=%s attempt=%s filesAffected=%s isSuccess=%s",
- sharedBlobName, client.getStorageProvider().name(), client.getBucketRegion(),
- client.getBucketName(), runTimeMs, startLatency, attempt.get(), this.blobNames.size(), isSuccess);
- log.info(message);
- }
+ @Override
+ public String getActionName() {
+ return "DELETE_FILES_PREFIXED";
+ }
+
+ @Override
+ public String getAdditionalLogMessage() {
+ return "filesAffected=" + affectedFiles;
+ }
+
+ @Override
+ public String toString() {
+ return "BlobCollectionDeletionTask action=" + getActionName() + " " + super.toString();
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
index 16a66b7..9fec3f4 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
@@ -84,9 +84,6 @@ public class SharedStoreManager {
return blobStorageProvider;
}
- /*
- * Initiates a BlobDeleteManager if it doesn't exist and returns one
- */
public BlobDeleteManager getBlobDeleteManager() {
if (blobDeleteManager != null) {
return blobDeleteManager;
@@ -123,5 +120,13 @@ public class SharedStoreManager {
public void initConcurrencyController(SharedCoreConcurrencyController concurrencyController) {
this.sharedCoreConcurrencyController = concurrencyController;
}
+
+ @VisibleForTesting
+ public void initBlobDeleteManager(BlobDeleteManager blobDeleteManager) {
+ if (this.blobDeleteManager != null) {
+ blobDeleteManager.shutdown();
+ }
+ this.blobDeleteManager = blobDeleteManager;
+ }
}
\ No newline at end of file
diff --git a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
index d03da40..b35b178 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
@@ -24,6 +24,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.io.FileUtils;
@@ -88,6 +89,24 @@ public class CoreStorageClientTest extends SolrTestCaseJ4 {
int expectedBlobKeyLength = TEST_CORE_NAME_1.length() + uuid4length + 1 + 4;
Assert.assertEquals(blobPath.length(), expectedBlobKeyLength);
}
+
+ @Test
+ public void testListBlobFiles() throws Exception {
+ List<String> expectedPaths = new LinkedList<>();
+ expectedPaths.add(
+ blobClient.pushStream(TEST_CORE_NAME_1, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "xxx"));
+ expectedPaths.add(
+ blobClient.pushStream(TEST_CORE_NAME_1, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "yyy"));
+ expectedPaths.add(
+ blobClient.pushStream(TEST_CORE_NAME_2, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "zzzz"));
+ expectedPaths.add(
+ blobClient.pushStream(TEST_CORE_NAME_2, new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "1234"));
+ blobClient.pushStream("s_test_core_nomatch", new ByteArrayInputStream(EMPTY_BYTES_ARR), EMPTY_BYTES_ARR.length, "1234");
+ // the common prefix is s_test_core_name for these blob files
+ List<String> blobFiles = blobClient.listCoreBlobFiles("s_test_core_name");
+ Assert.assertEquals(expectedPaths.size(), blobFiles.size());
+ Assert.assertTrue(blobFiles.toString(), blobFiles.containsAll(expectedPaths));
+ }
@Test
public void testListBlobCommonPrefixes() throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java
new file mode 100644
index 0000000..aa7774d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/BlobDeleteProcessorTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.store.blob.client.BlobException;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.client.LocalStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobFileDeletionTask;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobPrefixedFileDeletionTask;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BlobDeleteProcessor}
+ */
+public class BlobDeleteProcessorTest extends SolrTestCaseJ4 {
+
+ private static String DEFAULT_PROCESSOR_NAME = "DeleterForTest";
+ private static Path sharedStoreRootPath;
+ private static CoreStorageClient blobClient;
+
+ private static List<BlobDeleterTask> enqueuedTasks;
+
+ @BeforeClass
+ public static void setupTestClass() throws Exception {
+ sharedStoreRootPath = createTempDir("tempDir");
+ System.setProperty(LocalStorageClient.BLOB_STORE_LOCAL_FS_ROOT_DIR_PROPERTY, sharedStoreRootPath.resolve("LocalBlobStore/").toString());
+ blobClient = new LocalStorageClient() {
+
+ // no ops for BlobFileDeletionTask and BlobPrefixedFileDeletionTask to execute successfully
+ @Override
+ public void deleteBlobs(Collection<String> paths) throws BlobException {
+ return;
+ }
+
+ // no ops for BlobFileDeletionTask and BlobPrefixedFileDeletionTask to execute successfully
+ @Override
+ public List<String> listCoreBlobFiles(String prefix) throws BlobException {
+ return new LinkedList<>();
+ }
+ };
+ }
+
+ @Before
+ public void setup() {
+ enqueuedTasks = new LinkedList<BlobDeleterTask>();
+ }
+
+ /**
+ * Verify we enqueue a {@link BlobFileDeletionTask} with the correct parameters.
+ * Note we're not testing the functionality of the deletion task here only that the processor successfully
+ * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest}
+ */
+ @Test
+ public void testDeleteFilesEnqueueTask() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 5;
+ int retryDelay = 500;
+ String name = "testName";
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ Set<String> names = new HashSet<>();
+ names.add("test1");
+ names.add("test2");
+ // uses the specified defaultMaxAttempts at the processor (not task) level
+ CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteFiles(name, names, true);
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+ assertEquals(1, enqueuedTasks.size());
+
+ assertEquals(1, enqueuedTasks.size());
+ assertNotNull(res);
+ assertEquals(1, res.getTask().getAttempts());
+ assertEquals(true, res.isSuccess());
+ assertEquals(false, res.shouldRetry());
+ }
+ }
+
+ /**
+ * Verify we enqueue a {@link BlobPrefixedFileDeletionTask} with the correct parameters.
+ * Note we're not testing the functionality of the deletion task here only that the processor successfully
+ * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest}
+ */
+ @Test
+ public void testDeleteShardEnqueueTask() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 5;
+ int retryDelay = 500;
+ String name = "testName";
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // uses the specified defaultMaxAttempts at the processor (not task) level
+ CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteCollection(name, true);
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+ assertEquals(1, enqueuedTasks.size());
+
+ assertEquals(1, enqueuedTasks.size());
+ assertNotNull(res);
+ assertEquals(1, res.getTask().getAttempts());
+ assertEquals(true, res.isSuccess());
+ assertEquals(false, res.shouldRetry());
+ }
+ }
+
+ /**
+ * Verify we enqueue a {@link BlobPrefixedFileDeletionTask} with the correct parameters.
+ * Note we're not testing the functionality of the deletion task here only that the processor successfully
+ * handles the task. End to end blob deletion tests can be found {@link SharedStoreDeletionProcessTest}
+ */
+ @Test
+ public void testDeleteCollectionEnqueueTask() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 5;
+ int retryDelay = 500;
+ String name = "testName";
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // uses the specified defaultMaxAttempts at the processor (not task) level
+ CompletableFuture<BlobDeleterTaskResult> cf = processor.deleteShard(name, name, true);
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+ assertEquals(1, enqueuedTasks.size());
+
+ assertEquals(1, enqueuedTasks.size());
+ assertNotNull(res);
+ assertEquals(1, res.getTask().getAttempts());
+ assertEquals(true, res.isSuccess());
+ assertEquals(false, res.shouldRetry());
+ }
+ }
+
+ /**
+ * Verify that we don't retry tasks that are not configured to be retried
+ * and end up failing
+ */
+ @Test
+ public void testNonRetryableTask() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 1; // ignored when we build the test task
+ int retryDelay = 500;
+ int totalAttempts = 5; // total number of attempts the task should be tried
+
+ String name = "testName";
+ boolean isRetry = false;
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // enqueue a task that fails and is not retryable
+ CompletableFuture<BlobDeleterTaskResult> cf =
+ processor.enqueue(buildFailingTaskForTest(blobClient, name, totalAttempts, false), isRetry);
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+
+ // the first fails
+ assertEquals(1, enqueuedTasks.size());
+ assertNotNull(res);
+ assertEquals(1, res.getTask().getAttempts());
+ assertEquals(false, res.isSuccess());
+ assertEquals(false, res.shouldRetry());
+
+ // initial error + 0 retry errors suppressed
+ assertNotNull(res.getError());
+ assertEquals(0, res.getError().getSuppressed().length);
+ }
+ }
+
+ /**
+ * Verify that the retry logic kicks in for tasks configured to retry
+ * and subsequent retry succeeds
+ */
+ @Test
+ public void testRetryableTaskSucceeds() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 1; // ignored when we build the test task
+ int retryDelay = 500;
+ int totalAttempts = 5; // total number of attempts the task should be tried
+ int totalFails = 3; // total number of times the task should fail
+
+ String name = "testName";
+ boolean isRetry = false;
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // enqueue a task that fails totalFails number of times before succeeding
+ CompletableFuture<BlobDeleterTaskResult> cf =
+ processor.enqueue(buildScheduledFailingTaskForTest(blobClient, name, totalAttempts, true, totalFails), isRetry);
+
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+
+ // the first 3 fail and last one succeeds
+ assertEquals(4, enqueuedTasks.size());
+
+ assertNotNull(res);
+ assertEquals(4, res.getTask().getAttempts());
+ assertEquals(true, res.isSuccess());
+
+ // initial error + 2 retry errors suppressed
+ assertNotNull(res.getError());
+ assertEquals(2, res.getError().getSuppressed().length);
+ }
+ }
+
+ /**
+ * Verify that after all task attempts are exhausted we bail out
+ */
+ @Test
+ public void testRetryableTaskFails() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 1; // ignored when we build the test task
+ int retryDelay = 500;
+ int totalAttempts = 5; // total number of attempts the task should be tried
+
+ String name = "testName";
+ boolean isRetry = false;
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // enqueue a task that fails every time it runs but is configured to retry
+ CompletableFuture<BlobDeleterTaskResult> cf =
+ processor.enqueue(buildFailingTaskForTest(blobClient, name, totalAttempts, true), isRetry);
+
+ // wait for this task and all its potential retries to finish
+ BlobDeleterTaskResult res = cf.get(5000, TimeUnit.MILLISECONDS);
+ // 1 initial enqueue + 4 retries
+ assertEquals(5, enqueuedTasks.size());
+
+ assertNotNull(res);
+ assertEquals(5, res.getTask().getAttempts());
+ assertEquals(false, res.isSuccess());
+ // circuit breaker should be false after all attempts are exceeded
+ assertEquals(false, res.shouldRetry());
+
+ // initial error + 4 retry errors suppressed
+ assertNotNull(res.getError());
+ assertEquals(4, res.getError().getSuppressed().length);
+ }
+ }
+
+ /**
+ * Verify that we cannot add more deletion tasks to the processor if the work queue
+ * is at its target max but that we can re-add tasks that are retries to the queue
+ */
+ @Test
+ public void testWorkQueueFull() throws Exception {
+ int maxQueueSize = 3;
+ int numThreads = 1;
+ int defaultMaxAttempts = 1;
+ int retryDelay = 1000;
+
+ String name = "testName";
+ boolean allowRetry = false;
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ // numThreads is 1 and we'll enqueue a blocking task that ensures our pool
+ // will be occupied while we add new tasks subsequently to test enqueue rejection
+ CountDownLatch tasklatch = new CountDownLatch(1);
+ processor.enqueue(buildBlockingTaskForTest(tasklatch), allowRetry);
+
+ // Fill the internal work queue beyond the maxQueueSize, the internal queue size is not
+ // approximate so we'll just add beyond the max
+ for (int i = 0; i < maxQueueSize*2; i++) {
+ try {
+ processor.deleteCollection(name, allowRetry);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ // verify adding a new task is rejected
+ try {
+ processor.deleteCollection(name, allowRetry);
+ fail("Task should have been rejected");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Unable to enqueue deletion"));
+ }
+ CompletableFuture<BlobDeleterTaskResult> cf = null;
+ try {
+ // verify adding a task that is marked as a retry is not rejected
+ cf = processor.enqueue(buildFailingTaskForTest(blobClient, name, 5, true), /* isRetry */ true);
+ } catch (Exception ex) {
+ fail("Task should not have been rejected");
+ }
+
+ // clean up and unblock the task
+ tasklatch.countDown();
+ }
+ }
+
+ /**
+ * Verify that with a continuous stream of delete tasks being enqueued, all eventually complete
+ * successfully in the face of failing tasks and retries without locking up our pool anywhere
+ */
+ @Test
+ public void testSimpleConcurrentDeletionEnqueues() throws Exception {
+ int maxQueueSize = 200;
+ int numThreads = 5;
+ int defaultMaxAttempts = 5;
+ int retryDelay = 100;
+ int numberOfTasks = 200;
+
+ try (BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(enqueuedTasks, blobClient,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay)) {
+ List<BlobDeleterTask> tasks = generateRandomTasks(defaultMaxAttempts, numberOfTasks);
+ List<CompletableFuture<BlobDeleterTaskResult>> taskResultsFutures = new LinkedList<>();
+ List<BlobDeleterTaskResult> results = new LinkedList<>();
+ for (BlobDeleterTask t : tasks) {
+ taskResultsFutures.add(processor.enqueue(t, false));
+ }
+
+ taskResultsFutures.forEach(cf -> {
+ try {
+ results.add(cf.get(20000, TimeUnit.MILLISECONDS));
+ } catch (Exception ex) {
+ fail("We timed out on some task!");
+ }
+ });
+
+ // we shouldn't enqueue more than (numberOfTasks * defaultMaxAttempts) tasks to the pool
+ assertTrue(enqueuedTasks.size() < (numberOfTasks * defaultMaxAttempts));
+ assertEquals(numberOfTasks, results.size());
+ int totalAttempts = 0;
+ for (BlobDeleterTaskResult res : results) {
+ assertNotNull(res);
+ assertNotNull(res.getTask());
+ assertEquals("scheduledFailingTask", res.getTask().getActionName());
+ totalAttempts += res.getTask().getAttempts();
+ }
+ // total task attempts should be consistent with our test scaffolding
+ assertTrue(totalAttempts < (numberOfTasks * defaultMaxAttempts));
+ }
+ }
+
+ private List<BlobDeleterTask> generateRandomTasks(int defaultMaxAttempts, int taskCount) {
+ List<BlobDeleterTask> tasks = new LinkedList<>();
+ for (int i = 0; i < taskCount; i++) {
+ BlobDeleterTask task = null;
+ int totalAttempts = random().nextInt(defaultMaxAttempts);
+ int totalFails = random().nextInt(defaultMaxAttempts + 1);
+ task = buildScheduledFailingTaskForTest(blobClient, "test"+i, totalAttempts, true, totalFails);
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ /**
+ * Returns a test-only task for just holding onto a resource for test purposes
+ */
+ private BlobDeleterTask buildBlockingTaskForTest(CountDownLatch latch) {
+ return new BlobDeleterTask(null, null, false, 0) {
+ @Override
+ public Collection<String> doDelete() throws Exception {
+ // block until something forces this latch to count down
+ latch.await();
+ return new LinkedList<>();
+ }
+
+ @Override
+ public String getActionName() { return "blockingTask"; }
+ };
+ }
+
+ /**
+ * Returns a test-only task that always fails on action execution by throwing an
+ * exception
+ */
+ private BlobDeleterTask buildFailingTaskForTest(CoreStorageClient client,
+ String collectionName, int maxRetries, boolean allowRetries) {
+ return new BlobDeleterTask(client, collectionName, allowRetries, maxRetries) {
+ @Override
+ public Collection<String> doDelete() throws Exception {
+ throw new Exception("");
+ }
+
+ @Override
+ public String getActionName() { return "failingTask"; }
+ };
+ }
+
+ /**
+ * Returns a test-only task that fails a specified number of times before succeeding
+ */
+ private BlobDeleterTask buildScheduledFailingTaskForTest(CoreStorageClient client,
+ String collectionName, int maxRetries, boolean allowRetries, int failTotal) {
+ return new BlobDeleterTask(client, collectionName, allowRetries, maxRetries) {
+ private AtomicInteger failCount = new AtomicInteger(0);
+
+ @Override
+ public Collection<String> doDelete() throws Exception {
+ while (failCount.get() < failTotal) {
+ failCount.incrementAndGet();
+ throw new Exception("");
+ }
+ return new LinkedList<>();
+ }
+
+ @Override
+ public String getActionName() { return "scheduledFailingTask"; }
+ };
+ }
+
+ // enables capturing all enqueues to the executor pool, including retries
+ private BlobDeleteProcessor buildBlobDeleteProcessorForTest(List<BlobDeleterTask> enqueuedTasks,
+ CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, int defaultMaxDeleteAttempts,
+ long fixedRetryDelay) {
+ return new BlobDeleteProcessorForTest(DEFAULT_PROCESSOR_NAME, client, almostMaxQueueSize, numDeleterThreads,
+ defaultMaxDeleteAttempts, fixedRetryDelay, enqueuedTasks);
+ }
+
+ class BlobDeleteProcessorForTest extends BlobDeleteProcessor {
+ List<BlobDeleterTask> enqueuedTasks;
+
+ public BlobDeleteProcessorForTest(String name, CoreStorageClient client, int almostMaxQueueSize,
+ int numDeleterThreads, int defaultMaxDeleteAttempts, long fixedRetryDelay, List<BlobDeleterTask> enqueuedTasks) {
+ super(name, client, almostMaxQueueSize, numDeleterThreads, defaultMaxDeleteAttempts, fixedRetryDelay);
+ this.enqueuedTasks = enqueuedTasks;
+ }
+
+ @Override
+ protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+ enqueuedTasks.add(task);
+ return super.enqueue(task, isRetry);
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java
new file mode 100644
index 0000000..179b5d7
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/SharedStoreDeletionProcessTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.metadata.DeleteBlobStrategyTest;
+import org.apache.solr.store.blob.process.BlobDeleterTask.BlobDeleterTaskResult;
+import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests around the deletion of files from shared storage via background deletion
+ * processes as triggered by normal indexing and collection api calls
+ */
+public class SharedStoreDeletionProcessTest extends SolrCloudSharedStoreTestCase {
+
+ private static String DEFAULT_PROCESSOR_NAME = "DeleterForTest";
+ private static Path sharedStoreRootPath;
+
+ private List<CompletableFuture<BlobDeleterTaskResult>> taskFutures;
+ private List<CompletableFuture<BlobDeleterTaskResult>> overseerTaskFutures;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ sharedStoreRootPath = createTempDir("tempDir");
+ }
+
+ @Before
+ public void setupTest() {
+ taskFutures = new LinkedList<>();
+ overseerTaskFutures = new LinkedList<>();
+ }
+
+ @After
+ public void teardownTest() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ // clean up the shared store after each test. The temp dir should clean up itself after the
+ // test class finishes
+ FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
+ }
+
+ /**
+ * Test that verifies that files marked for deletion during the indexing process get
+ * enqueued for deletion and deleted. This test case differs from {@link DeleteBlobStrategyTest}
+ * because it tests the end-to-end indexing flow with a {@link MiniSolrCloudCluster}
+ */
+ @Test
+ public void testIndexingTriggersDeletes() throws Exception {
+ setupCluster(1);
+ setupSolrNodes();
+ JettySolrRunner node = cluster.getJettySolrRunner(0);
+ int numThreads = 5;
+ int targetMaxAttempts = 5;
+ // files don't need to age before marked for deletion, deleted as indexed in this test
+ int delay = 0;
+ initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+ taskFutures, overseerTaskFutures);
+
+ // set up the collection
+ String collectionName = "SharedCollection";
+ int maxShardsPerNode = 1;
+ int numReplicas = 1;
+ String shardNames = "shard1";
+ setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+
+ // index and track deletions, commits are implicit in shared storage so we expect pushes to occur
+ // per batch and new segment index files to be deleted per batch with the exception of the very first
+ sendIndexingBatch(collectionName, /*numDocs */ 1, /* docIdStart */ 0);
+ assertEquals(0, taskFutures.size());
+
+ // next indexing batch should cause files to be added for deletion
+ sendIndexingBatch(collectionName, /*numDocs */ 1, /* docIdStart */ 1);
+ assertEquals(1, taskFutures.size());
+
+ CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+ BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+
+ // verify the files were deleted
+ CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+ assertTrue(result.isSuccess());
+ assertFilesDeleted(storageClient, result);
+ }
+
+ /**
+ * Test that verifies that collection deletion command deletes all files for the given collection on the
+ * happy path
+ */
+ @Test
+ public void testDeleteCollectionCommand() throws Exception {
+ setupCluster(1);
+ setupSolrNodes();
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ JettySolrRunner node = cluster.getJettySolrRunner(0);
+ int numThreads = 5;
+ int targetMaxAttempts = 5;
+ // files don't need to age before marked for deletion, deleted as indexed in this test
+ int delay = 0;
+ initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+ taskFutures, overseerTaskFutures);
+
+ // set up the collection
+ String collectionName = "SharedCollection";
+ int maxShardsPerNode = 10;
+ int numReplicas = 1;
+ String shardNames = "shard1,shard2";
+ setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+
+ // index a bunch of docs
+ for (int i = 0; i < 10; i++) {
+ int numDocs = 100;
+ sendIndexingBatch(collectionName, numDocs, i*numDocs);
+ }
+ assertTrue(taskFutures.size() > 0);
+ assertEquals(0, overseerTaskFutures.size());
+
+ // do collection deletion
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collectionName);
+ delete.process(cloudClient).getResponse();
+ assertEquals(1, overseerTaskFutures.size());
+
+ // wait for the deletion command to complete
+ CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+ BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+
+ // the collection should no longer exist on zookeeper
+ cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
+ assertNull(cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collectionName));
+
+ // verify the files in the deletion tasks were all deleted
+ CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+ assertTrue(result.isSuccess());
+ assertFilesDeleted(storageClient, result);
+
+ // verify no files belonging to this collection exists on shared storage
+ List<String> names = storageClient.listCoreBlobFiles(collectionName);
+ assertTrue(names.isEmpty());
+ }
+
+ /**
+ * Test that verifies that shard deletion command deletes all files for the given shard on the
+ * happy path
+ */
+ @Test
+ public void testDeleteShardCommand() throws Exception {
+ setupCluster(1);
+ setupSolrNodes();
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ JettySolrRunner node = cluster.getJettySolrRunner(0);
+ int numThreads = 5;
+ int targetMaxAttempts = 5;
+ // files don't need to age before marked for deletion, deleted as indexed in this test
+ int delay = 0;
+ initiateDeleteManagerForTest(node, targetMaxAttempts, numThreads, delay,
+ taskFutures, overseerTaskFutures);
+
+ // set up the collection
+ String collectionName = "SharedCollection";
+ int maxShardsPerNode = 10;
+ int numReplicas = 1;
+ String shardNames = "shard1";
+ setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+
+ // index a bunch of docs
+ for (int i = 0; i < 10; i++) {
+ int numDocs = 100;
+ sendIndexingBatch(collectionName, numDocs, i*numDocs);
+ }
+ assertTrue(taskFutures.size() > 0);
+ assertEquals(0, overseerTaskFutures.size());
+
+ // split the shard so the parents are set inactive and can be deleted
+ CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
+ .setShardName("shard1");
+ splitShard.process(cloudClient);
+ waitForState("Timed out waiting for sub shards to be active.",
+ collectionName, activeClusterShape(2, 3));
+
+ // do shard deletion on the parent
+ CollectionAdminRequest.DeleteShard delete = CollectionAdminRequest.deleteShard(collectionName, "shard1");
+ delete.process(cloudClient).getResponse();
+ assertEquals(1, overseerTaskFutures.size());
+
+ // wait for the deletion command to complete
+ CompletableFuture<BlobDeleterTaskResult> cf = taskFutures.get(0);
+ BlobDeleterTaskResult result = cf.get(5000, TimeUnit.MILLISECONDS);
+
+ // the collection shard should no longer exist on zookeeper
+ cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
+ DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollectionOrNull(collectionName);
+ assertNotNull(coll);
+ assertNull(coll.getSlice("shard1"));
+
+ // verify the files in the deletion tasks were all deleted
+ CoreStorageClient storageClient = node.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+ assertTrue(result.isSuccess());
+ assertFilesDeleted(storageClient, result);
+
+ // verify no files belonging to shard1 in the collection exists on shared storage
+ List<String> names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1/"));
+ assertTrue(names.isEmpty());
+
+ // verify files belonging to shard1_0 and shard1_1 exist still
+ names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1_0"));
+ assertFalse(names.isEmpty());
+ names = storageClient.listCoreBlobFiles(Assign.buildSharedShardName(collectionName, "shard1_1"));
+ assertFalse(names.isEmpty());
+ }
+
+ private void assertFilesDeleted(CoreStorageClient storageClient, BlobDeleterTaskResult result) throws IOException {
+ Collection<String> filesDeleted = result.getFilesDeleted();
+ for (String filename : filesDeleted) {
+ InputStream s = null;
+ try {
+ s = storageClient.pullStream(filename);
+ fail(filename + " should have been deleted from shared store");
+ } catch (Exception ex) {
+ if (!(ex.getCause() instanceof FileNotFoundException)) {
+ fail("Unexpected exception thrown = " + ex.getMessage());
+ }
+ } finally {
+ if (s != null) {
+ s.close();
+ }
+ }
+ }
+ }
+
+ private void sendIndexingBatch(String collectionName, int numberOfDocs, int docIdStart) throws SolrServerException, IOException {
+ UpdateRequest updateReq = new UpdateRequest();
+ for (int k = docIdStart; k < docIdStart + numberOfDocs; k++) {
+ updateReq.add("id", Integer.toString(k));
+ }
+ updateReq.process(cluster.getSolrClient(), collectionName);
+ }
+
+ private BlobDeleteManager initiateDeleteManagerForTest(JettySolrRunner solrRunner,
+ int almostMaxQueueSize, int numDeleterThreads, int deleteDelayMs, List<CompletableFuture<BlobDeleterTaskResult>> taskFutures,
+ List<CompletableFuture<BlobDeleterTaskResult>> overseerTaskFutures) {
+ CoreStorageClient client = solrRunner.getCoreContainer().getSharedStoreManager().getBlobStorageProvider().getClient();
+ int maxQueueSize = 200;
+ int numThreads = 5;
+ int defaultMaxAttempts = 5;
+ int retryDelay = 500;
+
+ // setup processors with the same defaults but enhanced to capture future results
+ BlobDeleteProcessor processor = buildBlobDeleteProcessorForTest(taskFutures, client,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay);
+ BlobDeleteProcessor overseerProcessor = buildBlobDeleteProcessorForTest(overseerTaskFutures, client,
+ maxQueueSize, numThreads, defaultMaxAttempts, retryDelay);
+
+ BlobDeleteManager deleteManager = new BlobDeleteManager(client, deleteDelayMs, processor, overseerProcessor);
+ setupBlobDeleteManagerForNode(deleteManager, solrRunner);
+ return deleteManager;
+ }
+
+ private void setupSolrNodes() throws Exception {
+ for (JettySolrRunner process : cluster.getJettySolrRunners()) {
+ CoreStorageClient storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
+ setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), process);
+ }
+ }
+
+ // enables capturing all enqueues to the executor pool, including retries
+ private BlobDeleteProcessor buildBlobDeleteProcessorForTest(List<CompletableFuture<BlobDeleterTaskResult>> taskFutures,
+ CoreStorageClient client, int almostMaxQueueSize, int numDeleterThreads, int defaultMaxDeleteAttempts,
+ long fixedRetryDelay) {
+ return new BlobDeleteProcessorForTest(DEFAULT_PROCESSOR_NAME, client, almostMaxQueueSize, numDeleterThreads,
+ defaultMaxDeleteAttempts, fixedRetryDelay, taskFutures);
+ }
+
+ /**
+ * Test class extending BlobDeleteProcessor to allow capturing task futures
+ */
+ class BlobDeleteProcessorForTest extends BlobDeleteProcessor {
+ List<CompletableFuture<BlobDeleterTaskResult>> futures;
+
+ public BlobDeleteProcessorForTest(String name, CoreStorageClient client, int almostMaxQueueSize,
+ int numDeleterThreads, int defaultMaxDeleteAttempts, long fixedRetryDelay,
+ List<CompletableFuture<BlobDeleterTaskResult>> taskFutures) {
+ super(name, client, almostMaxQueueSize, numDeleterThreads, defaultMaxDeleteAttempts, fixedRetryDelay);
+ this.futures = taskFutures;
+ }
+
+ @Override
+ protected CompletableFuture<BlobDeleterTaskResult> enqueue(BlobDeleterTask task, boolean isRetry) {
+ CompletableFuture<BlobDeleterTaskResult> futureRes = super.enqueue(task, isRetry);
+ futures.add(futureRes);
+ return futureRes;
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
index c22f24f..909443c 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
@@ -28,11 +28,12 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.store.blob.client.BlobCoreMetadata;
import org.apache.solr.store.blob.client.CoreStorageClient;
import org.apache.solr.store.blob.client.LocalStorageClient;
+import org.apache.solr.store.blob.process.BlobDeleteManager;
import org.apache.solr.store.blob.process.BlobProcessUtil;
import org.apache.solr.store.blob.process.CorePullTask;
+import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
import org.apache.solr.store.blob.process.CorePullerFeeder;
import org.apache.solr.store.blob.process.CoreSyncStatus;
-import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
import org.apache.solr.store.blob.provider.BlobStorageProvider;
/**
@@ -100,6 +101,14 @@ public class SolrCloudSharedStoreTestCase extends SolrCloudTestCase {
SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
manager.initConcurrencyController(concurrencyController);
}
+
+ /**
+ * Configures the Solr process with the given {@link BlobDeleteManager}
+ */
+ protected static void setupBlobDeleteManagerForNode(BlobDeleteManager deleteManager, JettySolrRunner solrRunner) {
+ SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
+ manager.initBlobDeleteManager(deleteManager);
+ }
/**
* Return a new CoreStorageClient that writes to the specified sharedStoreRootPath and blobDirectoryName