You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:53:20 UTC

[lucene] 06/45: SOLR-15101: Add list/delete APIs for incremental backups (#2379)

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit 136408b03187c41b28b97e5118e4f945f1526a9e
Author: Jason Gerlowski <ge...@apache.org>
AuthorDate: Tue Feb 16 19:55:25 2021 -0500

    SOLR-15101: Add list/delete APIs for incremental backups (#2379)
    
    SOLR-13608 introduces a new "incremental" backup format, which allows
    storage of multiple backup "points" in the same location. This
    development introduces a need for APIs to manage these potentially
    plural backups.
    
    This commit introduces /admin/collections?action=LISTBACKUPS and
    /admin/collections?action=DELETEBACKUP to handle these backups.
---
 .../cloud/api/collections/DeleteBackupCmd.java     | 200 +++++++++++++++++-
 .../java/org/apache/solr/core/CoreContainer.java   |   6 +-
 .../apache/solr/core/backup/BackupProperties.java  |   2 +-
 .../apache/solr/handler/CollectionBackupsAPI.java  |  68 ++++++
 .../org/apache/solr/handler/CollectionsAPI.java    |   1 -
 .../solr/handler/admin/CollectionsHandler.java     | 138 ++++++++++++-
 .../BackupRestoreApiErrorConditionsTest.java       | 211 +++++++++++++++++++
 .../solr/cloud/api/collections/PurgeGraphTest.java | 187 +++++++++++++++++
 .../admin/V2CollectionBackupsAPIMappingTest.java   | 135 ++++++++++++
 solr/solr-ref-guide/src/collection-management.adoc | 205 ++++++++++++++++++
 .../src/making-and-restoring-backups.adoc          |   4 +-
 .../solrj/request/CollectionAdminRequest.java      | 228 ++++++++++++++++++++-
 .../solrj/request/beans/DeleteBackupPayload.java   |  44 ++++
 .../solrj/request/beans/ListBackupPayload.java     |  32 +++
 .../apache/solr/common/params/CoreAdminParams.java |   5 +
 .../org/apache/solr/common/params/V2ApiParams.java |  34 +++
 .../solrj/request/TestCollectionAdminRequest.java  |  43 ++++
 .../collections/AbstractIncrementalBackupTest.java |  74 ++++++-
 18 files changed, 1584 insertions(+), 33 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteBackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteBackupCmd.java
index 5eba388..5abd49c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteBackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteBackupCmd.java
@@ -58,11 +58,8 @@ import static org.apache.solr.core.backup.BackupManager.START_TIME_PROP;
  * An overseer command used to delete files associated with incremental backups.
  *
  * This assumes use of the incremental backup format, and not the (now deprecated) traditional 'full-snapshot' format.
- * The deletion can either delete a specific {@link BackupId}, or delete everything except the most recent N backup
- * points.
- *
- * While this functionality is structured as an overseer command, the message type isn't currently handled by the
- * overseer or recorded to the overseer queue by any APIs.  This integration will be added in the forthcoming SOLR-15101
+ * The deletion can either delete a specific {@link BackupId}, delete everything except the most recent N backup
+ * points, or can be used to trigger a "garbage collection" of unused index files in the backup repository.
  */
 public class DeleteBackupCmd implements OverseerCollectionMessageHandler.Cmd {
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -80,9 +77,10 @@ public class DeleteBackupCmd implements OverseerCollectionMessageHandler.Cmd {
         String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
         int backupId = message.getInt(CoreAdminParams.BACKUP_ID, -1);
         int lastNumBackupPointsToKeep = message.getInt(CoreAdminParams.MAX_NUM_BACKUP_POINTS, -1);
-        if (backupId == -1 && lastNumBackupPointsToKeep == -1) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-                    String.format(Locale.ROOT, "%s or %s param must be provided", CoreAdminParams.BACKUP_ID, CoreAdminParams.MAX_NUM_BACKUP_POINTS));
+        boolean purge = message.getBool(CoreAdminParams.BACKUP_PURGE_UNUSED, false);
+        if (backupId == -1 && lastNumBackupPointsToKeep == -1 && !purge) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, String.format(Locale.ROOT, "%s, %s or %s param must be provided", CoreAdminParams.BACKUP_ID, CoreAdminParams.MAX_NUM_BACKUP_POINTS,
+                    CoreAdminParams.BACKUP_PURGE_UNUSED));
         }
         CoreContainer cc = ocmh.overseer.getCoreContainer();
         try (BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo))) {
@@ -94,7 +92,9 @@ public class DeleteBackupCmd implements OverseerCollectionMessageHandler.Cmd {
                         "backup-deletion is only supported on incremental backups");
             }
 
-            if (backupId != -1){
+            if (purge) {
+                purge(repository, backupPath, results);
+            } else if (backupId != -1){
                 deleteBackupId(repository, backupPath, backupId, results);
             } else {
                 keepNumberOfBackup(repository, backupPath, lastNumBackupPointsToKeep, results);
@@ -102,6 +102,28 @@ public class DeleteBackupCmd implements OverseerCollectionMessageHandler.Cmd {
         }
     }
 
+    @SuppressWarnings({"unchecked"})
+    /**
+     * Clean up {@code backupPath} by removing all index files, shard-metadata files, and backup property files that are
+     * unreachable, uncompleted or corrupted.
+     */
+    void purge(BackupRepository repository, URI backupPath, @SuppressWarnings({"rawtypes"}) NamedList result) throws IOException {
+        PurgeGraph purgeGraph = new PurgeGraph();
+        purgeGraph.build(repository, backupPath);
+
+        BackupFilePaths backupPaths = new BackupFilePaths(repository, backupPath);
+        repository.delete(backupPaths.getIndexDir(), purgeGraph.indexFileDeletes, true);
+        repository.delete(backupPaths.getShardBackupMetadataDir(), purgeGraph.shardBackupMetadataDeletes, true);
+        repository.delete(backupPath, purgeGraph.backupIdDeletes, true);
+
+        @SuppressWarnings({"rawtypes"})
+        NamedList details = new NamedList();
+        details.add("numBackupIds", purgeGraph.backupIdDeletes.size());
+        details.add("numShardBackupIds", purgeGraph.shardBackupMetadataDeletes.size());
+        details.add("numIndexFiles", purgeGraph.indexFileDeletes.size());
+        result.add("deleted", details);
+    }
+
     /**
      * Keep most recent {@code  maxNumBackup} and delete the rest.
      */
@@ -215,9 +237,167 @@ public class DeleteBackupCmd implements OverseerCollectionMessageHandler.Cmd {
                                 int bid, @SuppressWarnings({"rawtypes"}) NamedList results) throws Exception {
         BackupId backupId = new BackupId(bid);
         if (!repository.exists(repository.resolve(backupPath, BackupFilePaths.getBackupPropsName(backupId)))) {
-            return;
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Backup ID [" + bid + "] not found; cannot be deleted");
         }
 
         deleteBackupIds(backupPath, repository, Collections.singleton(backupId), results);
     }
+
+    final static class PurgeGraph {
+        // graph
+        Map<String, Node> backupIdNodeMap = new HashMap<>();
+        Map<String, Node> shardBackupMetadataNodeMap = new HashMap<>();
+        Map<String, Node> indexFileNodeMap = new HashMap<>();
+
+        // delete queues
+        List<String> backupIdDeletes = new ArrayList<>();
+        List<String> shardBackupMetadataDeletes = new ArrayList<>();
+        List<String> indexFileDeletes = new ArrayList<>();
+
+        public void build(BackupRepository repository, URI backupPath) throws IOException {
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, backupPath);
+            buildLogicalGraph(repository, backupPath);
+            findDeletableNodes(repository, backupPaths);
+        }
+
+        public void findDeletableNodes(BackupRepository repository, BackupFilePaths backupPaths) {
+            // mark nodes as existing
+            visitExistingNodes(repository.listAllOrEmpty(backupPaths.getShardBackupMetadataDir()),
+                    shardBackupMetadataNodeMap, shardBackupMetadataDeletes);
+            // this may be a long running commands
+            visitExistingNodes(repository.listAllOrEmpty(backupPaths.getIndexDir()),
+                    indexFileNodeMap, indexFileDeletes);
+
+            // for nodes which are not existing, propagate that information to other nodes
+            shardBackupMetadataNodeMap.values().forEach(Node::propagateNotExisting);
+            indexFileNodeMap.values().forEach(Node::propagateNotExisting);
+
+            addDeleteNodesToQueue(backupIdNodeMap, backupIdDeletes);
+            addDeleteNodesToQueue(shardBackupMetadataNodeMap, shardBackupMetadataDeletes);
+            addDeleteNodesToQueue(indexFileNodeMap, indexFileDeletes);
+        }
+
+        /**
+         * Visiting files (nodes) actually present in physical layer,
+         * if it does not present in the {@code nodeMap}, it should be deleted by putting into the {@code deleteQueue}
+         */
+        private void visitExistingNodes(String[] existingNodeKeys, Map<String, Node> nodeMap, List<String> deleteQueue) {
+            for (String nodeKey : existingNodeKeys) {
+                Node node = nodeMap.get(nodeKey);
+
+                if (node == null) {
+                    deleteQueue.add(nodeKey);
+                } else {
+                    node.existing = true;
+                }
+            }
+        }
+
+        private <T> void addDeleteNodesToQueue(Map<T, Node> tNodeMap, List<T> deleteQueue) {
+            tNodeMap.forEach((key, value) -> {
+                if (value.delete) {
+                    deleteQueue.add(key);
+                }
+            });
+        }
+
+        Node getBackupIdNode(String backupPropsName) {
+            return backupIdNodeMap.computeIfAbsent(backupPropsName, bid -> {
+                Node node = new Node();
+                node.existing = true;
+                return node;
+            });
+        }
+
+        Node getShardBackupIdNode(String shardBackupId) {
+            return shardBackupMetadataNodeMap.computeIfAbsent(shardBackupId, s -> new Node());
+        }
+
+        Node getIndexFileNode(String indexFile) {
+            return indexFileNodeMap.computeIfAbsent(indexFile, s -> new IndexFileNode());
+        }
+
+        void addEdge(Node node1, Node node2) {
+            node1.addNeighbor(node2);
+            node2.addNeighbor(node1);
+        }
+
+        private void buildLogicalGraph(BackupRepository repository, URI backupPath) throws IOException {
+            List<BackupId> backupIds = BackupFilePaths.findAllBackupIdsFromFileListing(repository.listAllOrEmpty(backupPath));
+            for (BackupId backupId : backupIds) {
+                BackupProperties backupProps = BackupProperties.readFrom(repository, backupPath,
+                        BackupFilePaths.getBackupPropsName(backupId));
+
+                Node backupIdNode = getBackupIdNode(BackupFilePaths.getBackupPropsName(backupId));
+                for (String shardBackupMetadataFilename : backupProps.getAllShardBackupMetadataFiles()) {
+                    Node shardBackupMetadataNode = getShardBackupIdNode(shardBackupMetadataFilename);
+                    addEdge(backupIdNode, shardBackupMetadataNode);
+
+                    ShardBackupMetadata shardBackupMetadata = ShardBackupMetadata.from(repository, backupPath,
+                            ShardBackupId.fromShardMetadataFilename(shardBackupMetadataFilename));
+                    if (shardBackupMetadata == null)
+                        continue;
+
+                    for (String indexFile : shardBackupMetadata.listUniqueFileNames()) {
+                        Node indexFileNode = getIndexFileNode(indexFile);
+                        addEdge(indexFileNode, shardBackupMetadataNode);
+                    }
+                }
+            }
+        }
+    }
+
+    //ShardBackupMetadata, BackupId
+    static class Node {
+        List<Node> neighbors;
+        boolean delete = false;
+        boolean existing = false;
+
+        void addNeighbor(Node node) {
+            if (neighbors == null) {
+                neighbors = new ArrayList<>();
+            }
+            neighbors.add(node);
+        }
+
+        void propagateNotExisting() {
+            if (existing)
+                return;
+
+            if (neighbors != null)
+                neighbors.forEach(Node::propagateDelete);
+        }
+
+        void propagateDelete() {
+            if (delete || !existing)
+                return;
+
+            delete = true;
+            if (neighbors != null) {
+                neighbors.forEach(Node::propagateDelete);
+            }
+        }
+    }
+
+    //IndexFile
+    final static class IndexFileNode extends Node {
+        int refCount = 0;
+
+        @Override
+        void addNeighbor(Node node) {
+            super.addNeighbor(node);
+            refCount++;
+        }
+
+        @Override
+        void propagateDelete() {
+            if (delete || !existing)
+                return;
+
+            refCount--;
+            if (refCount == 0) {
+                delete = true;
+            }
+        }
+    }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 2d48b30..d1d04da 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -89,6 +89,7 @@ import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
 import org.apache.solr.filestore.PackageStoreAPI;
 import org.apache.solr.handler.ClusterAPI;
+import org.apache.solr.handler.CollectionBackupsAPI;
 import org.apache.solr.handler.CollectionsAPI;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.SnapShooter;
@@ -726,7 +727,10 @@ public class CoreContainer {
     createHandler(ZK_PATH, ZookeeperInfoHandler.class.getName(), ZookeeperInfoHandler.class);
     createHandler(ZK_STATUS_PATH, ZookeeperStatusHandler.class.getName(), ZookeeperStatusHandler.class);
     collectionsHandler = createHandler(COLLECTIONS_HANDLER_PATH, cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
-    containerHandlers.getApiBag().registerObject(new CollectionsAPI(collectionsHandler));
+    final CollectionsAPI collectionsAPI = new CollectionsAPI(collectionsHandler);
+    containerHandlers.getApiBag().registerObject(collectionsAPI);
+    final CollectionBackupsAPI collectionBackupsAPI = new CollectionBackupsAPI(collectionsHandler);
+    containerHandlers.getApiBag().registerObject(collectionBackupsAPI);
     configSetsHandler = createHandler(CONFIGSETS_HANDLER_PATH, cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
     ClusterAPI clusterAPI = new ClusterAPI(collectionsHandler, configSetsHandler);
     containerHandlers.getApiBag().registerObject(clusterAPI);
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupProperties.java b/solr/core/src/java/org/apache/solr/core/backup/BackupProperties.java
index 56496ee..6bfa55e 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupProperties.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupProperties.java
@@ -41,7 +41,7 @@ import java.util.stream.Collectors;
 import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 
 /**
- * Represents a backup[-*].properties file and the methods to read/write it.
+ * Represents a backup[-*].properties file, responsible for holding whole-collection and whole-backup metadata.
  *
  * These files live in a different location and hold different metadata depending on the backup format used.  The (now
  * deprecated) traditional 'full-snapshot' backup format places this file at $LOCATION/$NAME, while the preferred
diff --git a/solr/core/src/java/org/apache/solr/handler/CollectionBackupsAPI.java b/solr/core/src/java/org/apache/solr/handler/CollectionBackupsAPI.java
new file mode 100644
index 0000000..aeeaf6d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/CollectionBackupsAPI.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.api.Command;
+import org.apache.solr.api.EndPoint;
+import org.apache.solr.api.PayloadObj;
+import org.apache.solr.client.solrj.request.beans.DeleteBackupPayload;
+import org.apache.solr.client.solrj.request.beans.ListBackupPayload;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.V2ApiParams;
+import org.apache.solr.handler.admin.CollectionsHandler;
+
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+import static org.apache.solr.common.params.CommonParams.ACTION;
+import static org.apache.solr.handler.ClusterAPI.wrapParams;
+import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;
+
+/**
+ * V2 API definitions for
+ */
+@EndPoint(
+        path = {"/c/backups", "/collections/backups"},
+        method = POST,
+        permission = COLL_EDIT_PERM)
+public class CollectionBackupsAPI {
+
+  private final CollectionsHandler collectionsHandler;
+
+  public CollectionBackupsAPI(CollectionsHandler collectionsHandler) {
+    this.collectionsHandler = collectionsHandler;
+  }
+
+  @Command(name = V2ApiParams.LIST_BACKUPS_CMD)
+  @SuppressWarnings("unchecked")
+  public void listBackups(PayloadObj<ListBackupPayload> obj) throws Exception {
+    final Map<String, Object> v1Params = obj.get().toMap(new HashMap<>());
+    v1Params.put(ACTION, CollectionParams.CollectionAction.LISTBACKUP.toLower());
+
+    collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(), v1Params), obj.getResponse());
+  }
+
+  @Command(name = V2ApiParams.DELETE_BACKUPS_CMD)
+  @SuppressWarnings("unchecked")
+  public void deleteBackups(PayloadObj<DeleteBackupPayload> obj) throws Exception {
+    final Map<String, Object> v1Params = obj.get().toMap(new HashMap<>());
+    v1Params.put(ACTION, CollectionParams.CollectionAction.DELETEBACKUP.toLower());
+
+    collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(), v1Params), obj.getResponse());
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/CollectionsAPI.java b/solr/core/src/java/org/apache/solr/handler/CollectionsAPI.java
index e02e5f2..49639fc 100644
--- a/solr/core/src/java/org/apache/solr/handler/CollectionsAPI.java
+++ b/solr/core/src/java/org/apache/solr/handler/CollectionsAPI.java
@@ -32,7 +32,6 @@ import static org.apache.solr.security.PermissionNameProvider.Name.COLL_READ_PER
 
 /**
  * All V2 APIs for collection management
- *
  */
 public class CollectionsAPI {
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6f00a4a..15b558b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -88,6 +88,10 @@ import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.BackupProperties;
 import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
@@ -158,12 +162,17 @@ import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STA
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.params.CommonParams.TIMING;
 import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
+import static org.apache.solr.common.params.CoreAdminParams.BACKUP_ID;
+import static org.apache.solr.common.params.CoreAdminParams.BACKUP_LOCATION;
+import static org.apache.solr.common.params.CoreAdminParams.BACKUP_PURGE_UNUSED;
+import static org.apache.solr.common.params.CoreAdminParams.BACKUP_REPOSITORY;
 import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_DATA_DIR;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_INDEX;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_INSTANCE_DIR;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_METRICS_HISTORY;
 import static org.apache.solr.common.params.CoreAdminParams.INSTANCE_DIR;
+import static org.apache.solr.common.params.CoreAdminParams.MAX_NUM_BACKUP_POINTS;
 import static org.apache.solr.common.params.CoreAdminParams.ULOG_DIR;
 import static org.apache.solr.common.params.ShardParams._ROUTE_;
 import static org.apache.solr.common.util.StrUtils.formatString;
@@ -1102,14 +1111,14 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
 
       CoreContainer cc = h.coreContainer;
-      String repo = req.getParams().get(CoreAdminParams.BACKUP_REPOSITORY);
+      String repo = req.getParams().get(BACKUP_REPOSITORY);
       BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
 
-      String location = repository.getBackupLocation(req.getParams().get(CoreAdminParams.BACKUP_LOCATION));
+      String location = repository.getBackupLocation(req.getParams().get(BACKUP_LOCATION));
       if (location == null) {
         //Refresh the cluster property file to make sure the value set for location is the latest
         // Check if the location is specified in the cluster property.
-        location = new ClusterProperties(h.coreContainer.getZkController().getZkClient()).getClusterProperty(CoreAdminParams.BACKUP_LOCATION, null);
+        location = new ClusterProperties(h.coreContainer.getZkController().getZkClient()).getClusterProperty(BACKUP_LOCATION, null);
         if (location == null) {
           throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
               + " parameter or as a default repository property or as a cluster property.");
@@ -1134,10 +1143,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
 
       Map<String, Object> params = copy(req.getParams(), null, NAME, COLLECTION_PROP,
-              FOLLOW_ALIASES, CoreAdminParams.COMMIT_NAME, CoreAdminParams.MAX_NUM_BACKUP_POINTS);
-      params.put(CoreAdminParams.BACKUP_LOCATION, location);
+              FOLLOW_ALIASES, CoreAdminParams.COMMIT_NAME, MAX_NUM_BACKUP_POINTS);
+      params.put(BACKUP_LOCATION, location);
       if (repo != null) {
-        params.put(CoreAdminParams.BACKUP_REPOSITORY, repo);
+        params.put(BACKUP_REPOSITORY, repo);
       }
 
       params.put(CollectionAdminParams.INDEX_BACKUP_STRATEGY, strategy);
@@ -1158,10 +1167,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
 
       final CoreContainer cc = h.coreContainer;
-      final String repo = req.getParams().get(CoreAdminParams.BACKUP_REPOSITORY);
+      final String repo = req.getParams().get(BACKUP_REPOSITORY);
       final BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
 
-      String location = repository.getBackupLocation(req.getParams().get(CoreAdminParams.BACKUP_LOCATION));
+      String location = repository.getBackupLocation(req.getParams().get(BACKUP_LOCATION));
       if (location == null) {
         //Refresh the cluster property file to make sure the value set for location is the latest
         // Check if the location is specified in the cluster property.
@@ -1195,16 +1204,123 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
 
       final Map<String, Object> params = copy(req.getParams(), null, NAME, COLLECTION_PROP);
-      params.put(CoreAdminParams.BACKUP_LOCATION, location);
+      params.put(BACKUP_LOCATION, location);
       if (repo != null) {
-        params.put(CoreAdminParams.BACKUP_REPOSITORY, repo);
+        params.put(BACKUP_REPOSITORY, repo);
       }
       // from CREATE_OP:
       copy(req.getParams(), params, COLL_CONF, REPLICATION_FACTOR, NRT_REPLICAS, TLOG_REPLICAS,
-          PULL_REPLICAS, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
+          PULL_REPLICAS, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
+          BACKUP_ID);
       copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
       return params;
     }),
+    DELETEBACKUP_OP(DELETEBACKUP, (req, rsp, h) -> {
+      req.getParams().required().check(NAME);
+
+      CoreContainer cc = h.coreContainer;
+      String repo = req.getParams().get(BACKUP_REPOSITORY);
+      try (BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo))) {
+
+        String location = repository.getBackupLocation(req.getParams().get(BACKUP_LOCATION));
+        if (location == null) {
+          //Refresh the cluster property file to make sure the value set for location is the latest
+          // Check if the location is specified in the cluster property.
+          location = new ClusterProperties(h.coreContainer.getZkController().getZkClient()).getClusterProperty("location", null);
+          if (location == null) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+                    + " parameter or as a default repository property or as a cluster property.");
+          }
+        }
+
+        // Check if the specified location is valid for this repository.
+        URI uri = repository.createURI(location);
+        try {
+          if (!repository.exists(uri)) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "specified location " + uri + " does not exist.");
+          }
+        } catch (IOException ex) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to check the existance of " + uri + ". Is it valid?", ex);
+        }
+
+        int deletionModesProvided = 0;
+        if (req.getParams().get(MAX_NUM_BACKUP_POINTS) != null) deletionModesProvided++;
+        if (req.getParams().get(BACKUP_PURGE_UNUSED) != null) deletionModesProvided++;
+        if (req.getParams().get(BACKUP_ID) != null) deletionModesProvided++;
+        if (deletionModesProvided != 1) {
+          throw new SolrException(BAD_REQUEST,
+                  String.format(Locale.ROOT, "Exactly one of %s, %s, and %s parameters must be provided",
+                          MAX_NUM_BACKUP_POINTS, BACKUP_PURGE_UNUSED, BACKUP_ID));
+        }
+
+        final Map<String, Object> params = copy(req.getParams(), null, NAME, BACKUP_REPOSITORY,
+                BACKUP_LOCATION, BACKUP_ID, MAX_NUM_BACKUP_POINTS, BACKUP_PURGE_UNUSED);
+        params.put(BACKUP_LOCATION, location);
+        if (repo != null) {
+          params.put(BACKUP_REPOSITORY, repo);
+        }
+        return params;
+      }
+    }),
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    LISTBACKUP_OP(LISTBACKUP, (req, rsp, h) -> {
+      req.getParams().required().check(NAME);
+
+      CoreContainer cc = h.coreContainer;
+      String repo = req.getParams().get(BACKUP_REPOSITORY);
+      try (BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo))) {
+
+        String location = repository.getBackupLocation(req.getParams().get(BACKUP_LOCATION));
+        if (location == null) {
+          //Refresh the cluster property file to make sure the value set for location is the latest
+          // Check if the location is specified in the cluster property.
+          location = new ClusterProperties(h.coreContainer.getZkController().getZkClient()).getClusterProperty(BACKUP_LOCATION, null);
+          if (location == null) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+                    + " parameter or as a default repository property or as a cluster property.");
+          }
+        }
+
+        String backupName = req.getParams().get(NAME);
+        final URI locationURI = repository.createURI(location);
+        try {
+          if (!repository.exists(locationURI)) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "specified location " + locationURI + " does not exist.");
+          }
+        } catch (IOException ex) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to check the existance of " + locationURI + ". Is it valid?", ex);
+        }
+        URI backupLocation = BackupFilePaths.buildExistingBackupLocationURI(repository, locationURI, backupName);
+        if (repository.exists(repository.resolve(backupLocation, BackupManager.TRADITIONAL_BACKUP_PROPS_FILE))) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup name [" + backupName + "] at " +
+                  "location [" + location + "] holds a non-incremental (legacy) backup, but " +
+                  "backup-listing is only supported on incremental backups");
+        }
+
+        String[] subFiles = repository.listAllOrEmpty(backupLocation);
+        List<BackupId> propsFiles = BackupFilePaths.findAllBackupIdsFromFileListing(subFiles);
+
+        NamedList<Object> results = new NamedList<>();
+        ArrayList<Map> backups = new ArrayList<>();
+        String collectionName = null;
+        for (BackupId backupId: propsFiles) {
+          BackupProperties properties = BackupProperties.readFrom(repository, backupLocation, BackupFilePaths.getBackupPropsName(backupId));
+          if (collectionName == null) {
+            collectionName = properties.getCollection();
+            results.add(BackupManager.COLLECTION_NAME_PROP, collectionName);
+          }
+
+          Map<Object, Object> details = properties.getDetails();
+          details.put("backupId", backupId.id);
+          backups.add(details);
+        }
+
+        results.add("backups", backups);
+        SolrResponse response = new OverseerSolrResponse(results);
+        rsp.getValues().addAll(response.getResponse());
+        return null;
+      }
+    }),
     CREATESNAPSHOT_OP(CREATESNAPSHOT, (req, rsp, h) -> {
       req.getParams().required().check(COLLECTION_PROP, CoreAdminParams.COMMIT_NAME);
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/BackupRestoreApiErrorConditionsTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/BackupRestoreApiErrorConditionsTest.java
new file mode 100644
index 0000000..fc6ff06
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/BackupRestoreApiErrorConditionsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+
+
+/**
+ * Integration test verifying particular errors are reported correctly by the Collection-level backup/restore APIs
+ */
+public class BackupRestoreApiErrorConditionsTest extends SolrCloudTestCase {
+  // TODO could these be unit tests somehow and still validate the response users see with certainty
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int NUM_SHARDS = 1;
+  private static final int NUM_REPLICAS = 1;
+  private static final String COLLECTION_NAME = "initial_collection";
+  private static final String BACKUP_NAME = "backup_name";
+  private static final String VALID_REPOSITORY_NAME = "local";
+  private static final long ASYNC_COMMAND_WAIT_PERIOD_MILLIS = 10 * 1000;
+
+  private static String validBackupLocation;
+
+  /*
+   * Creates a single node cluster with an empty collection COLLECTION_NAME a configured BackupRepository named
+   * VALID_REPOSITORY_NAME, and an existing incremental backup at location 'validBackupLocation'
+   */
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    System.setProperty("solr.allowPaths", "*");
+    validBackupLocation = createTempDir().toAbsolutePath().toString();
+
+    String solrXml = MiniSolrCloudCluster.DEFAULT_CLOUD_SOLR_XML;
+    String local =
+            "<backup>" +
+                    "<repository  name=\"local\" class=\"org.apache.solr.core.backup.repository.LocalFileSystemRepository\">" +
+                    "</repository>" +
+                    "</backup>";
+    solrXml = solrXml.replace("</solr>", local + "</solr>");
+
+    configureCluster(NUM_SHARDS)// nodes
+            .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+            .withSolrXml(solrXml)
+            .configure();
+
+    final RequestStatusState createState = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", NUM_SHARDS, NUM_REPLICAS)
+            .processAndWait(cluster.getSolrClient(), ASYNC_COMMAND_WAIT_PERIOD_MILLIS);
+    assertEquals(RequestStatusState.COMPLETED, createState);
+
+    final RequestStatusState backupState = CollectionAdminRequest.backupCollection(COLLECTION_NAME, BACKUP_NAME)
+            .setRepositoryName(VALID_REPOSITORY_NAME)
+            .setLocation(validBackupLocation)
+            .processAndWait(cluster.getSolrClient(), ASYNC_COMMAND_WAIT_PERIOD_MILLIS);
+    assertEquals(RequestStatusState.COMPLETED, backupState);
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    System.clearProperty("solr.allowPaths");
+  }
+
+  @Test
+  public void testBackupOperationsReportErrorWhenUnknownBackupRepositoryRequested() throws Exception {
+    // Check message for create-backup
+    Exception e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.backupCollection(COLLECTION_NAME, BACKUP_NAME)
+              .setRepositoryName("some-nonexistent-repo-name")
+              .setLocation(validBackupLocation)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("Could not find a backup repository with name some-nonexistent-repo-name"));
+
+    // Check message for list-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.listBackup(BACKUP_NAME)
+              .setBackupLocation(validBackupLocation)
+              .setBackupRepository("some-nonexistent-repo-name")
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("Could not find a backup repository with name some-nonexistent-repo-name"));
+
+    // Check message for delete-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.deleteBackupById(BACKUP_NAME, 1)
+              .setLocation(validBackupLocation)
+              .setRepositoryName("some-nonexistent-repo-name")
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("Could not find a backup repository with name some-nonexistent-repo-name"));
+
+    // Check message for restore-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.restoreCollection(COLLECTION_NAME + "_restored", BACKUP_NAME)
+              .setLocation(validBackupLocation)
+              .setRepositoryName("some-nonexistent-repo-name")
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("Could not find a backup repository with name some-nonexistent-repo-name"));
+  }
+
+  @Test
+  public void testBackupOperationsReportErrorWhenNonexistentLocationProvided() {
+    // Check message for create-backup
+    Exception e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.backupCollection(COLLECTION_NAME, BACKUP_NAME)
+            .setRepositoryName(VALID_REPOSITORY_NAME)
+            .setLocation(validBackupLocation + File.pathSeparator + "someNonexistentLocation")
+            .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("specified location"));
+    assertTrue(e.getMessage().contains("does not exist"));
+
+    // Check message for list-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.listBackup(BACKUP_NAME)
+              .setBackupLocation(validBackupLocation + File.pathSeparator + "someNonexistentLocation")
+              .setBackupRepository(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("specified location"));
+    assertTrue(e.getMessage().contains("does not exist"));
+
+    // Check message for delete-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.deleteBackupById(BACKUP_NAME, 1)
+              .setLocation(validBackupLocation + File.pathSeparator + "someNonexistentLocation")
+              .setRepositoryName(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("specified location"));
+    assertTrue(e.getMessage().contains("does not exist"));
+
+    // Check message for restore-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.restoreCollection(COLLECTION_NAME + "_restored", BACKUP_NAME)
+              .setLocation(validBackupLocation + File.pathSeparator + "someNonexistentLocation")
+              .setRepositoryName(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("specified location"));
+    assertTrue(e.getMessage().contains("does not exist"));
+  }
+
+  @Test
+  public void testListAndDeleteFailOnOldBackupLocations() throws Exception {
+    final String nonIncrementalBackupLocation = createTempDir().toAbsolutePath().toString();
+    final RequestStatusState backupState = CollectionAdminRequest.backupCollection(COLLECTION_NAME, BACKUP_NAME)
+            .setRepositoryName(VALID_REPOSITORY_NAME)
+            .setLocation(nonIncrementalBackupLocation)
+            .setIncremental(false)
+            .processAndWait(cluster.getSolrClient(), ASYNC_COMMAND_WAIT_PERIOD_MILLIS);
+    assertEquals(RequestStatusState.COMPLETED, backupState);
+
+    // Check message for list-backup
+    Exception e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.listBackup(BACKUP_NAME)
+              .setBackupLocation(nonIncrementalBackupLocation)
+              .setBackupRepository(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("The backup name [backup_name] at location"));
+    assertTrue(e.getMessage().contains("holds a non-incremental (legacy) backup, but backup-listing is only supported on incremental backups"));
+
+    // Check message for delete-backup
+    e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.deleteBackupById(BACKUP_NAME, 1)
+              .setLocation(nonIncrementalBackupLocation)
+              .setRepositoryName(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("The backup name [backup_name] at location"));
+    assertTrue(e.getMessage().contains("holds a non-incremental (legacy) backup, but backup-deletion is only supported on incremental backups"));
+  }
+
+  @Test
+  public void testDeleteFailsOnNonexistentBackupId() {
+    Exception e = expectThrows(Exception.class, () -> {
+      CollectionAdminRequest.deleteBackupById(BACKUP_NAME, 123)
+              .setLocation(validBackupLocation)
+              .setRepositoryName(VALID_REPOSITORY_NAME)
+              .process(cluster.getSolrClient());
+    });
+    assertTrue(e.getMessage().contains("Backup ID [123] not found; cannot be deleted"));
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/PurgeGraphTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/PurgeGraphTest.java
new file mode 100644
index 0000000..0f1f2a0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/PurgeGraphTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ObjectArrays;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.api.collections.DeleteBackupCmd.PurgeGraph;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PurgeGraphTest extends SolrTestCaseJ4 {
+    private static final String[] shardBackupIds = new String[]{"b1_s1", "b1_s2", "b2_s1", "b2_s2", "b3_s1", "b3_s2", "b3_s3"};
+
+    @Test
+    public void test() throws URISyntaxException, IOException {
+        assumeWorkingMockito();
+        BackupRepository repository = mock(BackupRepository.class);
+        BackupFilePaths paths = mock(BackupFilePaths.class);
+        when(paths.getBackupLocation()).thenReturn(new URI("/temp"));
+        when(paths.getIndexDir()).thenReturn(new URI("/temp/index"));
+        when(paths.getShardBackupMetadataDir()).thenReturn(new URI("/temp/backup_point"));
+
+        PurgeGraph purgeGraph = new PurgeGraph();
+        buildCompleteGraph(repository, paths, purgeGraph);
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(0, purgeGraph.backupIdDeletes.size());
+        assertEquals(0, purgeGraph.shardBackupMetadataDeletes.size());
+        assertEquals(0, purgeGraph.indexFileDeletes.size());
+
+        testDeleteUnreferencedFiles(repository, paths, purgeGraph);
+        testMissingBackupPointFiles(repository, paths);
+        testMissingIndexFiles(repository, paths);
+    }
+
+    private void testMissingIndexFiles(BackupRepository repository, BackupFilePaths paths) throws IOException {
+        PurgeGraph purgeGraph = new PurgeGraph();
+        buildCompleteGraph(repository, paths, purgeGraph);
+
+        Set<String> indexFiles = purgeGraph.indexFileNodeMap.keySet();
+        when(repository.listAllOrEmpty(same(paths.getIndexDir()))).thenAnswer((Answer<String[]>) invocationOnMock -> {
+            Set<String> newFiles = new HashSet<>(indexFiles);
+            newFiles.remove("s1_102");
+            return newFiles.toArray(new String[0]);
+        });
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(3, purgeGraph.backupIdDeletes.size());
+        assertEquals(shardBackupIds.length, purgeGraph.shardBackupMetadataDeletes.size());
+        assertEquals(purgeGraph.indexFileNodeMap.size(), purgeGraph.indexFileDeletes.size() + 1);
+
+        purgeGraph = new PurgeGraph();
+        buildCompleteGraph(repository, paths, purgeGraph);
+
+        Set<String> indexFiles2 = purgeGraph.indexFileNodeMap.keySet();
+        when(repository.listAllOrEmpty(same(paths.getIndexDir()))).thenAnswer((Answer<String[]>) invocationOnMock -> {
+            Set<String> newFiles = new HashSet<>(indexFiles2);
+            newFiles.remove("s1_101");
+            return newFiles.toArray(new String[0]);
+        });
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(2, purgeGraph.backupIdDeletes.size());
+        assertEquals(4, purgeGraph.shardBackupMetadataDeletes.size());
+        assertTrue(purgeGraph.indexFileDeletes.contains("s1_100"));
+        assertFalse(purgeGraph.indexFileDeletes.contains("s1_101"));
+    }
+
+    private void testMissingBackupPointFiles(BackupRepository repository, BackupFilePaths paths) throws IOException {
+        PurgeGraph purgeGraph = new PurgeGraph();
+        buildCompleteGraph(repository, paths, purgeGraph);
+        when(repository.listAllOrEmpty(same(paths.getShardBackupMetadataDir()))).thenAnswer((Answer<String[]>)
+                invocationOnMock -> Arrays.copyOfRange(shardBackupIds, 1, shardBackupIds.length)
+        );
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(1, purgeGraph.backupIdDeletes.size());
+        assertEquals("b1", purgeGraph.backupIdDeletes.get(0));
+        assertEquals(1, purgeGraph.shardBackupMetadataDeletes.size());
+        assertEquals("b1_s2", purgeGraph.shardBackupMetadataDeletes.get(0));
+        assertTrue(purgeGraph.indexFileDeletes.contains("s1_100"));
+        assertFalse(purgeGraph.indexFileDeletes.contains("s1_101"));
+
+        purgeGraph = new PurgeGraph();
+        buildCompleteGraph(repository, paths, purgeGraph);
+        when(repository.listAllOrEmpty(same(paths.getShardBackupMetadataDir()))).thenAnswer((Answer<String[]>)
+                invocationOnMock -> new String[]{"b1_s1", "b2_s1", "b3_s1", "b3_s2", "b3_s3"}
+        );
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(2, purgeGraph.backupIdDeletes.size());
+        assertTrue(purgeGraph.backupIdDeletes.containsAll(Arrays.asList("b1", "b2")));
+        assertEquals(2, purgeGraph.shardBackupMetadataDeletes.size());
+        assertTrue(purgeGraph.shardBackupMetadataDeletes.containsAll(Arrays.asList("b2_s1", "b1_s1")));
+        assertTrue(purgeGraph.indexFileDeletes.containsAll(Arrays.asList("s1_100", "s1_101")));
+        assertFalse(purgeGraph.indexFileDeletes.contains("s1_102"));
+    }
+
+    private void testDeleteUnreferencedFiles(BackupRepository repository, BackupFilePaths paths,
+                                             PurgeGraph purgeGraph) throws IOException {
+        buildCompleteGraph(repository, paths, purgeGraph);
+        String[] unRefBackupPoints = addUnRefFiles(repository, "b4_s", paths.getShardBackupMetadataDir());
+        String[] unRefIndexFiles = addUnRefFiles(repository, "s4_", paths.getIndexDir());
+
+        purgeGraph.findDeletableNodes(repository, paths);
+
+        assertEquals(0, purgeGraph.backupIdDeletes.size());
+        assertEquals(unRefBackupPoints.length, purgeGraph.shardBackupMetadataDeletes.size());
+        assertTrue(purgeGraph.shardBackupMetadataDeletes.containsAll(Arrays.asList(unRefBackupPoints)));
+        assertEquals(unRefIndexFiles.length, purgeGraph.indexFileDeletes.size());
+        assertTrue(purgeGraph.indexFileDeletes.containsAll(Arrays.asList(unRefIndexFiles)));
+    }
+
+    private String[] addUnRefFiles(BackupRepository repository, String prefix, URI dir) {
+        String[] unRefBackupPoints = new String[random().nextInt(10) + 1];
+        for (int i = 0; i < unRefBackupPoints.length; i++) {
+            unRefBackupPoints[i] = prefix + (100 + i);
+        }
+        String[] shardBackupMetadataFiles = repository.listAllOrEmpty(dir);
+        when(repository.listAllOrEmpty(same(dir)))
+                .thenAnswer((Answer<String[]>) invocation
+                        -> ObjectArrays.concat(shardBackupMetadataFiles, unRefBackupPoints, String.class));
+        return unRefBackupPoints;
+    }
+
+    private void buildCompleteGraph(BackupRepository repository, BackupFilePaths paths,
+                                    PurgeGraph purgeGraph) throws IOException {
+        when(repository.listAllOrEmpty(same(paths.getShardBackupMetadataDir()))).thenAnswer((Answer<String[]>) invocationOnMock -> shardBackupIds);
+        //logical
+
+        for (String shardBackupId : shardBackupIds) {
+            purgeGraph.addEdge(purgeGraph.getShardBackupIdNode(shardBackupId),
+                    purgeGraph.getBackupIdNode(shardBackupId.substring(0, 2)));
+            for (int i = 0; i < random().nextInt(30); i++) {
+                String fileName = shardBackupId.substring(3) + "_" + random().nextInt(15);
+                purgeGraph.addEdge(purgeGraph.getShardBackupIdNode(shardBackupId),
+                        purgeGraph.getIndexFileNode(fileName));
+            }
+        }
+
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b1_s1"),
+                purgeGraph.getIndexFileNode("s1_100"));
+
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b1_s1"),
+                purgeGraph.getIndexFileNode("s1_101"));
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b2_s1"),
+                purgeGraph.getIndexFileNode("s1_101"));
+
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b1_s1"),
+                purgeGraph.getIndexFileNode("s1_102"));
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b2_s1"),
+                purgeGraph.getIndexFileNode("s1_102"));
+        purgeGraph.addEdge(purgeGraph.getShardBackupIdNode("b3_s1"),
+                purgeGraph.getIndexFileNode("s1_102"));
+
+        when(repository.listAllOrEmpty(same(paths.getIndexDir()))).thenAnswer((Answer<String[]>) invocationOnMock ->
+                purgeGraph.indexFileNodeMap.keySet().toArray(new String[0]));
+    }
+}
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/V2CollectionBackupsAPIMappingTest.java b/solr/core/src/test/org/apache/solr/handler/admin/V2CollectionBackupsAPIMappingTest.java
new file mode 100644
index 0000000..a12b316
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/admin/V2CollectionBackupsAPIMappingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.admin;
+
+import com.google.common.collect.Maps;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.api.Api;
+import org.apache.solr.api.ApiBag;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.CommandOperation;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.handler.CollectionBackupsAPI;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.common.params.CommonParams.ACTION;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class V2CollectionBackupsAPIMappingTest extends SolrTestCaseJ4 {
+
+  private ApiBag apiBag;
+  private ArgumentCaptor<SolrQueryRequest> queryRequestCaptor;
+  private CollectionsHandler mockCollectionsHandler;
+
+  @BeforeClass
+  public static void ensureWorkingMockito() {
+    assumeWorkingMockito();
+  }
+
+  @Before
+  public void setupApiBag() throws Exception {
+    mockCollectionsHandler = mock(CollectionsHandler.class);
+    queryRequestCaptor = ArgumentCaptor.forClass(SolrQueryRequest.class);
+
+    apiBag = new ApiBag(false);
+    final CollectionBackupsAPI collBackupsAPI = new CollectionBackupsAPI(mockCollectionsHandler);
+    apiBag.registerObject(collBackupsAPI);
+  }
+
+  @Test
+  public void testDeleteBackupsAllParams() throws Exception {
+    final SolrParams v1Params = captureConvertedV1Params("/collections/backups", "POST",
+            "{'delete-backups': {" +
+                    "'name': 'backupName', " +
+                    "'location': '/some/location/uri', " +
+                    "'repository': 'someRepository', " +
+                    "'backupId': 123, " +
+                    "'maxNumBackupPoints': 456, " +
+                    "'purgeUnused': true, " +
+                    "'async': 'requestTrackingId'" +
+                    "}}");
+
+    assertEquals(CollectionParams.CollectionAction.DELETEBACKUP.lowerName, v1Params.get(ACTION));
+    assertEquals("backupName", v1Params.get(NAME));
+    assertEquals("/some/location/uri", v1Params.get(CoreAdminParams.BACKUP_LOCATION));
+    assertEquals("someRepository", v1Params.get(CoreAdminParams.BACKUP_REPOSITORY));
+    assertEquals(123, v1Params.getPrimitiveInt(CoreAdminParams.BACKUP_ID));
+    assertEquals(456, v1Params.getPrimitiveInt(CoreAdminParams.MAX_NUM_BACKUP_POINTS));
+    assertEquals(true, v1Params.getPrimitiveBool(CoreAdminParams.BACKUP_PURGE_UNUSED));
+    assertEquals("requestTrackingId", v1Params.get(CommonAdminParams.ASYNC));
+  }
+
+  @Test
+  public void testListBackupsAllParams() throws Exception {
+    final SolrParams v1Params = captureConvertedV1Params("/collections/backups", "POST",
+            "{'list-backups': {" +
+                    "'name': 'backupName', " +
+                    "'location': '/some/location/uri', " +
+                    "'repository': 'someRepository' " +
+                    "}}");
+
+    assertEquals(CollectionParams.CollectionAction.LISTBACKUP.lowerName, v1Params.get(ACTION));
+    assertEquals("backupName", v1Params.get(NAME));
+    assertEquals("/some/location/uri", v1Params.get(CoreAdminParams.BACKUP_LOCATION));
+    assertEquals("someRepository", v1Params.get(CoreAdminParams.BACKUP_REPOSITORY));
+  }
+
+  private SolrParams captureConvertedV1Params(String path, String method, String v2RequestBody) throws Exception {
+    final HashMap<String, String> parts = new HashMap<>();
+    final Api api = apiBag.lookup(path, method, parts);
+    final SolrQueryResponse rsp = new SolrQueryResponse();
+    final LocalSolrQueryRequest req = new LocalSolrQueryRequest(null, Maps.newHashMap()) {
+      @Override
+      public List<CommandOperation> getCommands(boolean validateInput) {
+        if (v2RequestBody == null) return Collections.emptyList();
+        return ApiBag.getCommandOperations(new ContentStreamBase.StringStream(v2RequestBody), api.getCommandSchema(), true);
+      }
+
+      @Override
+      public Map<String, String> getPathTemplateValues() {
+        return parts;
+      }
+
+      @Override
+      public String getHttpMethod() {
+        return method;
+      }
+    };
+
+
+    api.call(req, rsp);
+    verify(mockCollectionsHandler).handleRequestBody(queryRequestCaptor.capture(), any());
+    return queryRequestCaptor.getValue().getParams();
+  }
+}
diff --git a/solr/solr-ref-guide/src/collection-management.adoc b/solr/solr-ref-guide/src/collection-management.adoc
index dfcd205..118325c 100644
--- a/solr/solr-ref-guide/src/collection-management.adoc
+++ b/solr/solr-ref-guide/src/collection-management.adoc
@@ -1286,6 +1286,109 @@ A boolean parameter allowing users to choose whether to create an incremental (`
 If unspecified, backups are done incrementally by default.
 Incremental backups are preferred in all known circumstances and snapshot backups are deprecated, so this parameter should only be used after much consideration.
 
+[[listbackup]]
+== LISTBACKUP: List Backups
+
+Lists information about each backup stored at the specified repository location.
+Basic metadata is returned about each backup including: the timestamp the backup was created, the Lucene version used to create the index, and the size of the backup both in number of files and total filesize.
+
+[NOTE]
+====
+Previous versions of Solr supported a different snapshot-based backup file structure that did not support the storage of multiple backups at the same location.
+Solr can still restore backups stored in this old format, but it is deprecated and will be removed in subsequent versions of Solr.
+The LISTBACKUP API does not support the deprecated format and attempts to use this API on a location holding an older backup will result in an error message.
+====
+
+The file structure used by Solr internally to represent backups changed in 8.9.0.
+While backups created prior to this format change can still be restored, the `LISTBACKUP` and `DELETEBACKUP` API commands are only valid on this newer format.
+Attempting to use them on a location holding an older backup will result in an error message.
+
+=== LISTBACKUP Parameters
+
+`name`::
+The name of the backups to list.
+The backup name usually corresponds to the collection-name, but isn't required to.
+This parameter is required.
+
+`location`::
+The repository location to list backups from. This parameter is required, unless a default location is defined on the repository configuration, or set as a <<cluster-node-management.adoc#clusterprop,cluster property>>.
++
+If the location path is on a mounted drive, the mount must be available on the node that serves as the overseer, even if the overseer node does not host a replica of the collection being backed up.
+Since any node can take the overseer role at any time, a best practice to avoid possible backup failures is to ensure the mount point is available on all nodes of the cluster.
+
+`repository`::
+The name of a repository to be used for accessing backup information.
+If no repository is specified then the local filesystem repository will be used automatically.
+
+`async`::
+Request ID to track this action which will be <<collections-api.adoc#asynchronous-calls,processed asynchronously>>.
+
+=== LISTBACKUP Example
+
+*Input*
+
+[.dynamic-tabs]
+--
+[example.tab-pane#v1listbackup]
+====
+[.tab-label]*V1 API*
+
+[source,bash]
+----
+http://localhost:8983/solr/admin/collections?action=LISTBACKUP&name=myBackupName&location=/path/to/my/shared/drive
+----
+====
+
+[example.tab-pane#v2listbackup]
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+POST http://localhost:8983/v2/collections/backups
+{
+  "list-backups" : {
+    "name": "myBackupName",
+    "location": "/path/to/my/shared/drive"
+  }
+}
+----
+====
+--
+
+*Output*
+
+[source,json]
+----
+{
+  "responseHeader":{
+    "status":0,
+    "QTime":4},
+  "collection":"books",
+  "backups":[{
+      "indexFileCount":0,
+      "indexSizeMB":0.0,
+      "shardBackupIds":{
+        "shard2":"md_shard2_0.json",
+        "shard1":"md_shard1_0.json"},
+      "collection.configName":"books",
+      "backupId":0,
+      "collectionAlias":"books",
+      "startTime":"2021-02-09T03:19:52.085653Z",
+      "indexVersion":"9.0.0"},
+    {
+      "indexFileCount":0,
+      "indexSizeMB":0.0,
+      "shardBackupIds":{
+        "shard2":"md_shard2_1.json",
+        "shard1":"md_shard1_1.json"},
+      "collection.configName":"books",
+      "backupId":1,
+      "collectionAlias":"books",
+      "startTime":"2021-02-09T03:19:52.268804Z",
+      "indexVersion":"9.0.0"}]}
+----
+
 [[restore]]
 == RESTORE: Restore Collection
 
@@ -1334,6 +1437,108 @@ Note: for `createNodeSet` the special value of `EMPTY` is not allowed with this
 Additionally, there are several parameters that may have been set on the original collection that can be overridden when restoring the backup (described in detail in the <<create,CREATE collection>> section):
 `collection.configName`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `property._name_=_value_`.
 
+[[deletebackup]]
+== DELETEBACKUP: Delete backup files from the remote repository
+
+Deletes backup files stored at the specified repository location.
+
+[NOTE]
+====
+Previous versions of Solr supported a different snapshot-based backup file structure that did not support the storage of multiple backups at the same location.
+Solr can still restore backups stored in this old format, but it is deprecated and will be removed in subsequent versions of Solr.
+The DELETEBACKUP API does not support the deprecated format and attempts to use this API on a location holding an older backup will result in an error message.
+====
+
+Solr allows storing multiple backups for the same collection at any given logical "location".
+These backup points are each given an identifier (`backupId`) which can be used to delete them specifically with this API.
+Alternatively Solr can be told to keep the last `maxNumBackupPoints` backups, deleting everything else at the given location.
+Deleting backup points in these ways can orphan index files that are no longer referenced by any backup points.
+These orphaned files can be detected and deleted using the `purgeUnused` option.
+See the parameter descriptions below for more information.
+
+=== DELETEBACKUP Example
+
+*Input*
+
+The following API command deletes the the first backup (`backupId=0`) at the specified repository location.
+
+[.dynamic-tabs]
+--
+[example.tab-pane#v1deletebackup]
+====
+[.tab-label]*V1 API*
+
+[source,bash]
+----
+http://localhost:8983/solr/admin/collections?action=DELETEBACKUP&name=myBackupName&location=/path/to/my/shared/drive&backupId=0
+----
+====
+
+[example.tab-pane#v2deletebackup]
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+POST http://localhost:8983/v2/collections/backups
+{
+  "delete-backups" : {
+    "name": "myBackupName",
+    "location": "/path/to/my/shared/drive",
+    "backupId": 0
+  }
+}
+----
+====
+--
+
+*Output*
+
+[source,json]
+----
+{
+  "responseHeader":{
+    "status":0,
+    "QTime":940},
+  "deleted":[[
+      "startTime","2021-02-09T03:19:52.085653Z",
+      "backupId",0,
+      "size",28381,
+      "numFiles",53]],
+  "collection":"books"}
+----
+
+=== DELETEBACKUP Parameters
+
+`name`::
+The backup name to delete backup files from.  This parameter is required.
+
+`location`::
+The repository location to delete backups from. This parameter is required, unless a default location is defined on the repository configuration, or set as a <<cluster-node-management.adoc#clusterprop,cluster property>>.
++
+If the location path is on a mounted drive, the mount must be available on the node that serves as the overseer, even if the overseer node does not host a replica of the collection being backed up.
+Since any node can take the overseer role at any time, a best practice to avoid possible backup failures is to ensure the mount point is available on all nodes of the cluster.
+
+`repository`::
+The name of a repository to be used for deleting backup files. If no repository is specified then the local filesystem repository will be used automatically.
+
+`backupId`::
+Explicitly specify a single backup-ID to delete.
+Only one of `backupId`, `maxNumBackupPoints`, and `purgeUnused` may be specified per DELETEBACKUP request.
+
+`maxNumBackupPoints`::
+Specify how many backups should be retained, deleting all others.
+Only one of `backupId`, `maxNumBackupPoints`, and `purgeUnused` may be specified per DELETEBACKUP request.
+
+`purgeUnused`::
+Solr's incremental backup support can orphan files if the backups referencing them are deleted.
+The `purgeUnused` flag parameter triggers a scan to detect these orphaned files and delete them.
+Administrators doing repeated backups at the same location should plan on using this parameter sporadically to reclaim disk space.
+Only one of `backupId`, `maxNumBackupPoints`, and `purgeUnused` may be specified per DELETEBACKUP request.
+
+`async`::
+Request ID to track this action which will be <<collections-api.adoc#asynchronous-calls,processed asynchronously>>.
+
 [[rebalanceleaders]]
 == REBALANCELEADERS: Rebalance Leaders
 
diff --git a/solr/solr-ref-guide/src/making-and-restoring-backups.adoc b/solr/solr-ref-guide/src/making-and-restoring-backups.adoc
index 5804444..a727c24 100644
--- a/solr/solr-ref-guide/src/making-and-restoring-backups.adoc
+++ b/solr/solr-ref-guide/src/making-and-restoring-backups.adoc
@@ -33,10 +33,12 @@ Support for backups when running SolrCloud is provided with the <<collection-man
 
 NOTE: SolrCloud Backup/Restore requires a shared file system mounted at the same path on all nodes, or HDFS.
 
-Two commands are available:
+Four different API commands are supported:
 
 * `action=BACKUP`: This command backs up Solr indexes and configurations. More information is available in the section <<collection-management.adoc#backup,Backup Collection>>.
 * `action=RESTORE`: This command restores Solr indexes and configurations. More information is available in the section <<collection-management.adoc#restore,Restore Collection>>.
+* `action=LISTBACKUP`: This command lists the backup points available at a specified location, displaying metadata for each.  More information is available in the section <<collection-management.adoc#listbackup,List Backups>>.
+* `action=DELETEBACKUP`: This command allows deletion of backup files or whole backups.  More information is available in the section <<collection-management.adoc#deletebackup,Delete Backups>>.
 
 == Standalone Mode Backups
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 80a1cbc..b39e75f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.request;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -35,6 +36,8 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
+import org.apache.solr.client.solrj.request.beans.DeleteBackupPayload;
+import org.apache.solr.client.solrj.request.beans.ListBackupPayload;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
@@ -53,6 +56,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.V2ApiParams;
 import org.apache.solr.common.util.NamedList;
 
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
@@ -1238,7 +1242,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     /**
      * Specify the ID of the backup-point to restore from.
      *
-     * '-1'q is used by default to have Solr restore from the most recent backup-point.
+     * '-1' is used by default to have Solr restore from the most recent backup-point.
      *
      * Solr can store multiple backup points for a given collection - each identified by a unique backup ID.  Users who
      * want to restore a particular backup-point can specify it using this method.
@@ -2792,6 +2796,228 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   }
 
   /**
+   * Configure a {@link SolrRequest} object to delete a single backup-point by it's Backup ID.
+   *
+   * The created request object is only valid on backup locations that use the new "incremental" backup file-format
+   * introduced in Solr 8.9.  It should not be used on locations holding "non-incremental" backups (those created prior
+   * to 8.9, or after 8.9 using the advanced "incremental=false" flag).
+   *
+   * @param backupName the name of the backup that this request should delete a single backup-point from.
+   * @param backupId the ID of the backup-point for this request to delete
+   */
+  public static DeleteBackup deleteBackupById(String backupName, int backupId) {
+    return new DeleteBackup(backupName).setBackupId(backupId);
+  }
+
+  /**
+   * Create a {@link SolrRequest} object to delete all backup-points after the most recent 'N'
+   *
+   * The created request object is only valid on backup locations that use the new "incremental" backup file-format
+   * introduced in Solr 8.9.  It should not be used on locations holding "non-incremental" backups (those created prior
+   * to 8.9, or after 8.9 using the advanced "incremental=false" flag).
+   *
+   * @param backupName the name of the backup that this request should delete backup-points from.
+   * @param numRecentBackupPointsToRetain the number of "most-recent" backup-points to retain.
+   */
+  public static DeleteBackup deleteBackupByRecency(String backupName, int numRecentBackupPointsToRetain) {
+    return new DeleteBackup(backupName).setMaxNumBackupPoints(numRecentBackupPointsToRetain);
+  }
+
+  /**
+   * Create a {@link SolrRequest} object to delete all unused-files at the backup location.
+   *
+   * The created request object is only valid on backup locations that use the new "incremental" backup file-format
+   * introduced in Solr 8.9.  It should not be used on locations holding "non-incremental" backups (those created prior
+   * to 8.9, or after 8.9 using the advanced "incremental=false" flag).
+   *
+   * @param backupName the name of the backup that this request should delete unused files from.
+   */
+  public static DeleteBackup deleteBackupPurgeUnusedFiles(String backupName) {
+    return new DeleteBackup(backupName).setPurgeUnused();
+  }
+
+  /**
+   * {@link SolrRequest} class for the "Backup Deletion" API.
+   *
+   * Currently the API represented by this class only supports deletion of the new "incremental" backup file-format
+   * introduced in Solr 8.9.  It should not be used on locations holding "non-incremental" backups (those created prior
+   * to 8.9, or after 8.9 using the advanced "incremental=false" flag).
+   */
+  public static class DeleteBackup extends CollectionAdminRequest<CollectionAdminResponse> {
+    private final DeleteBackupPayload deleteBackupPayload;
+
+    private DeleteBackup (String backupName) {
+      super(CollectionAction.DELETEBACKUP);
+
+      deleteBackupPayload = new DeleteBackupPayload();
+      deleteBackupPayload.name = backupName;
+    }
+
+    /**
+     *
+     * @param backupRepository the name of the repository implementation to use for accessing backup information.
+     *                         Defaults to 'LocalFileSystemRepository' if not specified.
+     */
+    public DeleteBackup setRepositoryName(String backupRepository) {
+      deleteBackupPayload.repository = backupRepository;
+      return this;
+    }
+
+    /**
+     *
+     * @param backupLocation the location this request will use when accessing backup information.  This parameter is not
+     *                       required - if not specified on the request, Solr will attempt to read a default location
+     *                       from BackupRepository configuration (solr.xml) and from cluster properties.  If none of these
+     *                       places provide 'location' information an error will be thrown.
+     */
+    public DeleteBackup setLocation(String backupLocation) {
+      deleteBackupPayload.location = backupLocation;
+      return this;
+    }
+
+    /**
+     *
+     * @param backupId the ID of a single backup-point for this request to delete.  Mutually exclusive with the
+     *                 parameters controlled by {@link #setMaxNumBackupPoints(int)} and {@link #setPurgeUnused()}.
+     *
+     * @see #deleteBackupById(String, int)
+     */
+    protected DeleteBackup setBackupId(int backupId) {
+      deleteBackupPayload.backupId = backupId;
+      return this;
+    }
+
+    /**
+     *
+     * @param backupPointsToRetain the number of backup-points to retain, deleting the reset.  Mutually exclusive with
+     *                             the parameters controlled by {@link #setBackupId(int)} and {@link #setPurgeUnused()}.
+     *
+     * @see #deleteBackupByRecency(String, int)
+     */
+    protected DeleteBackup setMaxNumBackupPoints(int backupPointsToRetain) {
+      deleteBackupPayload.maxNumBackupPoints = backupPointsToRetain;
+      return this;
+    }
+
+    /**
+     * Configures the request to delete all unused files.
+     *
+     * Mutually exclusive with the parameters controlled by {@link #setBackupId(int)} and {@link #setMaxNumBackupPoints(int)}
+     *
+     * @see #deleteBackupPurgeUnusedFiles(String)
+     */
+    protected DeleteBackup setPurgeUnused() {
+      deleteBackupPayload.purgeUnused = true;
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
+      params.set(CoreAdminParams.NAME, deleteBackupPayload.name);
+      params.setNonNull(CoreAdminParams.BACKUP_LOCATION, deleteBackupPayload.location);
+      params.setNonNull(CoreAdminParams.BACKUP_REPOSITORY, deleteBackupPayload.repository);
+      params.setNonNull(CoreAdminParams.BACKUP_ID, deleteBackupPayload.backupId);
+      params.setNonNull(CoreAdminParams.MAX_NUM_BACKUP_POINTS, deleteBackupPayload.maxNumBackupPoints);
+      params.setNonNull(CoreAdminParams.BACKUP_PURGE_UNUSED, deleteBackupPayload.purgeUnused);
+      return params;
+    }
+
+    @Override
+    public SolrRequest getV2Request() {
+      if (usev2) {
+        return new V2Request.Builder(V2ApiParams.BACKUPS_API_PATH)
+            .useBinary(useBinaryV2)
+            .withMethod(METHOD.POST)
+            .withPayload(Collections.singletonMap(V2ApiParams.DELETE_BACKUPS_CMD, deleteBackupPayload))
+            .build();
+      }
+      return this;
+    }
+
+    @Override
+    protected CollectionAdminResponse createResponse(SolrClient client) {
+      return new CollectionAdminResponse();
+    }
+  }
+
+  /**
+   * Create a {@link SolrRequest} object to list information about all backup points with the specified name.
+   *
+   * @param backupName the name of the backup that this request should list information about
+   */
+  public static ListBackup listBackup(String backupName) {
+    return new ListBackup(backupName);
+  }
+
+  /**
+   * {@link SolrRequest} class for the "Backup List" API.
+   *
+   * Currently the API represented by this class only supports listing of the new "incremental" backup file-format
+   * introduced in Solr 8.9.  It should not be used on locations holding "non-incremental" backups (those created prior
+   * to 8.9, or after 8.9 using the advanced "incremental=false" flag).
+   */
+  public static class ListBackup extends CollectionAdminRequest<CollectionAdminResponse> {
+    private final ListBackupPayload listPayload;
+
+    private ListBackup(String backupName) {
+      super(CollectionAction.LISTBACKUP);
+
+      this.listPayload = new ListBackupPayload();
+      this.listPayload.name = backupName;
+    }
+
+    /**
+     *
+     * @param backupRepository the name of the repository implementation to use for accessing backup information.
+     *                         Defaults to 'LocalFileSystemRepository' if not specified.
+     */
+    public ListBackup setBackupRepository(String backupRepository) {
+      listPayload.repository = backupRepository;
+      return this;
+    }
+
+    /**
+     *
+     * @param backupLocation the location this request will use when accessing backup information.  This parameter is not
+     *                       required - if not specified on the request, Solr will attempt to read a default location
+     *                       from BackupRepository configuration (solr.xml) and from cluster properties.  If none of these
+     *                       places provide 'location' information an error will be thrown.
+     */
+    public ListBackup setBackupLocation(String backupLocation) {
+      listPayload.location = backupLocation;
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
+      params.set(CoreAdminParams.NAME, listPayload.name);
+      params.setNonNull(CoreAdminParams.BACKUP_LOCATION, listPayload.location);
+      params.setNonNull(CoreAdminParams.BACKUP_REPOSITORY, listPayload.repository);
+
+      return params;
+    }
+
+    @Override
+    public SolrRequest getV2Request() {
+      if (usev2) {
+        return new V2Request.Builder(V2ApiParams.BACKUPS_API_PATH)
+            .useBinary(useBinaryV2)
+            .withMethod(METHOD.POST)
+            .withPayload(Collections.singletonMap(V2ApiParams.LIST_BACKUPS_CMD, listPayload))
+            .build();
+      }
+      return this;
+    }
+
+    @Override
+    protected CollectionAdminResponse createResponse(SolrClient client) {
+      return new CollectionAdminResponse();
+    }
+  }
+
+  /**
    * Returns a SolrRequest to add a property to a specific replica
    */
   public static AddReplicaProp addReplicaProperty(String collection, String shard, String replica,
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/DeleteBackupPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/DeleteBackupPayload.java
new file mode 100644
index 0000000..3bac775
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/DeleteBackupPayload.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.request.beans;
+
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+
+public class DeleteBackupPayload implements ReflectMapWriter {
+
+  @JsonProperty(required = true)
+  public String name;
+
+  @JsonProperty
+  public String location;
+
+  @JsonProperty
+  public String repository;
+
+  @JsonProperty
+  public Integer backupId;
+
+  @JsonProperty
+  public Integer maxNumBackupPoints;
+
+  @JsonProperty
+  public Boolean purgeUnused;
+
+  @JsonProperty
+  public String async;
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/ListBackupPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/ListBackupPayload.java
new file mode 100644
index 0000000..408f12a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/ListBackupPayload.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.request.beans;
+
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+
+public class ListBackupPayload implements ReflectMapWriter {
+
+  @JsonProperty
+  public String name;
+
+  @JsonProperty
+  public String location;
+
+  @JsonProperty
+  public String repository;
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index 291ff94..8acb63e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -143,6 +143,11 @@ public abstract class CoreAdminParams
   public static final String BACKUP_ID = "backupId";
 
   /**
+   * Purging/deleting all index files, shard-backup metadata files, backup property files that are unreachable, uncompleted or corrupted.
+   */
+  public static final String BACKUP_PURGE_UNUSED = "purgeUnused";
+
+  /**
    * A parameter to specify whether incremental backup is used
    */
   public static final String BACKUP_INCREMENTAL = "incremental";
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/V2ApiParams.java b/solr/solrj/src/java/org/apache/solr/common/params/V2ApiParams.java
new file mode 100644
index 0000000..189e02c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/params/V2ApiParams.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.params;
+
+/**
+ * String constants used in sending and receiving V2 API requests.
+ */
+public class V2ApiParams {
+  private V2ApiParams() { /* Private ctor prevents instantiation */ }
+
+  public static final String COLLECTIONS_API_PATH = "/collections";
+  public static final String C_API_PATH = "/c";
+
+  public static final String BACKUPS_API_PATH = COLLECTIONS_API_PATH + "/backups";
+  public static final String BACKUPS_API_SHORT_PATH = C_API_PATH + "/backups";
+
+  public static final String LIST_BACKUPS_CMD = "list-backups";
+  public static final String DELETE_BACKUPS_CMD = "delete-backups";
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestCollectionAdminRequest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestCollectionAdminRequest.java
index 2024d9b..9b6b461 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestCollectionAdminRequest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestCollectionAdminRequest.java
@@ -16,9 +16,15 @@
  */
 package org.apache.solr.client.solrj.request;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.solr.SolrTestCase;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.CreateAlias;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.CreateShard;
+import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.junit.Test;
 
@@ -87,4 +93,41 @@ public class TestCollectionAdminRequest extends SolrTestCase {
     assertTrue(exceptionMessage.contains("invalid$shard@name"));
     assertTrue(exceptionMessage.contains("must consist entirely of periods, underscores, hyphens, and alphanumerics"));
   }
+
+  @Test
+  public void testDeleteBackupsV2Mapping() {
+    final V2Request deleteBackupRequest = getV2Request(CollectionAdminRequest.deleteBackupById("someBackupName", 1));
+
+    assertEquals(SolrRequest.METHOD.POST, deleteBackupRequest.getMethod());
+    assertEquals("/collections/backups", deleteBackupRequest.getPath());
+    assertEquals("{\"delete-backups\":{\"name\":\"someBackupName\",\"backupId\":1}}",
+        getRequestBody(deleteBackupRequest, ClientUtils.TEXT_JSON));
+  }
+
+  @Test
+  public void testListBackupsV2Mapping() {
+    final V2Request listBackupRequest = getV2Request(CollectionAdminRequest.listBackup("backupName"));
+
+    assertEquals(SolrRequest.METHOD.POST, listBackupRequest.getMethod());
+    assertEquals("/collections/backups", listBackupRequest.getPath());
+    assertEquals("{\"list-backups\":{\"name\":\"backupName\"}}",
+        getRequestBody(listBackupRequest, ClientUtils.TEXT_JSON));
+  }
+
+  private V2Request getV2Request(CollectionAdminRequest request) {
+    request.setUseV2(true);
+    return (V2Request) request.getV2Request();
+  }
+
+  private String getRequestBody(V2Request request, String contentType) {
+    final ByteArrayOutputStream os = new ByteArrayOutputStream();
+    try {
+      request.getContentWriter(contentType).write(os);
+      return new String(os.toByteArray(), StandardCharsets.UTF_8);
+
+    } catch (IOException e) {
+      /* Unreachable in practice, since we're not doing any I/O here */
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java b/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
index 104aac8..067f324 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.util.ArrayList;
@@ -137,20 +138,21 @@ public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
         CollectionAdminRequest
                 .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
                 .process(solrClient);
-        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        int totalIndexedDocs = indexDocs(backupCollectionName, true);
         String backupName = BACKUPNAME_PREFIX + testSuffix;
         try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
                 .newBackupRepository(Optional.of(BACKUP_REPO_NAME))) {
             String backupLocation = repository.getBackupLocation(getBackupLocation());
             long t = System.nanoTime();
+            int expectedDocsForFirstBackup = totalIndexedDocs;
             CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
                     .setLocation(backupLocation)
                     .setIncremental(true)
                     .setRepositoryName(BACKUP_REPO_NAME)
                     .processAndWait(cluster.getSolrClient(), 100);
             long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
-            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
-            expectedNumDocs += indexDocs(backupCollectionName, true);
+            log.info("Created backup with {} docs, took {}ms", totalIndexedDocs, timeTaken);
+            totalIndexedDocs += indexDocs(backupCollectionName, true);
 
             t = System.nanoTime();
             CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
@@ -171,7 +173,7 @@ public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
             log.info("Restored from backup, took {}ms", timeTaken);
             numFound = cluster.getSolrClient().query(restoreCollectionName,
                     new SolrQuery("*:*")).getResults().getNumFound();
-            assertEquals(expectedNumDocs, numFound);
+            assertEquals(expectedDocsForFirstBackup, numFound);
         }
     }
 
@@ -196,8 +198,8 @@ public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
         try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
                 .newBackupRepository(Optional.of(BACKUP_REPO_NAME))) {
             String backupLocation = repository.getBackupLocation(getBackupLocation());
-            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
-            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            URI fullBackupLocationURI = repository.resolve(repository.createURI(backupLocation), backupName, getCollectionName());
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, fullBackupLocationURI);
             IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
 
             backupRestoreThenCheck(solrClient, verifier);
@@ -208,13 +210,63 @@ public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
             for (int i = 0; i < 15; i++) {
                 indexDocs(getCollectionName(), 5,false);
             }
-            backupRestoreThenCheck(solrClient, verifier);
 
+            backupRestoreThenCheck(solrClient, verifier);
             indexDocs(getCollectionName(), false);
             backupRestoreThenCheck(solrClient, verifier);
 
+            // test list backups
+            CollectionAdminResponse resp =
+                    CollectionAdminRequest.listBackup(backupName)
+                            .setBackupLocation(backupLocation)
+                            .setBackupRepository(BACKUP_REPO_NAME)
+                            .process(cluster.getSolrClient());
+            ArrayList backups = (ArrayList) resp.getResponse().get("backups");
+            assertEquals(3, backups.size());
+
+            // test delete backups
+            resp = CollectionAdminRequest.deleteBackupByRecency(backupName, 4)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+            assertEquals(null, resp.getResponse().get("deleted"));
+
+            resp =  CollectionAdminRequest.deleteBackupByRecency(backupName, 3)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+            assertEquals(null, resp.getResponse().get("deleted"));
+
+            resp = CollectionAdminRequest.deleteBackupByRecency(backupName, 2)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+            assertEquals(1, resp.getResponse()._get("deleted[0]/backupId", null));
+
+            resp = CollectionAdminRequest.deleteBackupById(backupName, 3)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+            assertEquals(3, resp.getResponse()._get("deleted[0]/backupId", null));
+
+
             simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
 
+            // test purge backups
+            // purging first since there may corrupted files were uploaded
+            resp = CollectionAdminRequest.deleteBackupPurgeUnusedFiles(backupName)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+
+            addDummyFileToIndex(repository, backupPaths.getIndexDir(), "dummy-files-1");
+            addDummyFileToIndex(repository, backupPaths.getIndexDir(), "dummy-files-2");
+            resp = CollectionAdminRequest.deleteBackupPurgeUnusedFiles(backupName)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .setLocation(backupLocation)
+                    .process(cluster.getSolrClient());
+            assertEquals(2, ((NamedList)resp.getResponse().get("deleted")).get("numIndexFiles"));
+
             new UpdateRequest()
                     .deleteByQuery("*:*")
                     .commit(cluster.getSolrClient(), getCollectionName());
@@ -269,6 +321,14 @@ public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
         }
     }
 
+    private void addDummyFileToIndex(BackupRepository repository, URI indexDir, String fileName) throws IOException {
+        try (OutputStream os = repository.createOutput(repository.resolve(indexDir, fileName))){
+            os.write(100);
+            os.write(101);
+            os.write(102);
+        }
+    }
+
     private void backupRestoreThenCheck(CloudSolrClient solrClient,
                                         IncrementalBackupVerifier verifier) throws Exception {
         verifier.incrementalBackupThenVerify();