You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by GitBox <gi...@apache.org> on 2021/01/26 20:17:34 UTC

[GitHub] [lucene-solr] muse-dev[bot] commented on a change in pull request #2250: SOLR-13608: Incremental backup file format

muse-dev[bot] commented on a change in pull request #2250:
URL: https://github.com/apache/lucene-solr/pull/2250#discussion_r564804612



##########
File path: solr/core/src/java/org/apache/solr/core/backup/ShardBackupMetadata.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core.backup;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.util.PropertiesInputStream;
+
+/**
+ * Represents the shard-backup metadata file.
+ *
+ * The shard-backup metadata file is responsible for holding information about a specific backup-point for a specific
+ * shard.  This includes the full list of index files required to restore this shard to the backup-point, with pointers
+ * to where each lives in the repository.
+ *
+ * Shard backup metadata files have names derived from an associated {@link ShardBackupId}, to avoid conflicts between
+ * shards and backupIds.
+ *
+ * Not used by the (now deprecated) traditional 'full-snapshot' backup format.
+ */
+public class ShardBackupMetadata {
+    private Map<String, BackedFile> allFiles = new HashMap<>();
+    private List<String> uniqueFileNames = new ArrayList<>();
+
+    public void addBackedFile(String uniqueFileName, String originalFileName, Checksum fileChecksum) {
+        addBackedFile(new BackedFile(uniqueFileName, originalFileName, fileChecksum));
+    }
+
+    public int numFiles() {
+        return uniqueFileNames.size();
+    }
+
+    public long totalSize() {
+        return allFiles.values().stream().map(bf -> bf.fileChecksum.size).reduce(0L, Long::sum);
+    }
+
+    public void addBackedFile(BackedFile backedFile) {
+        allFiles.put(backedFile.originalFileName, backedFile);
+        uniqueFileNames.add(backedFile.uniqueFileName);
+    }
+
+    public Optional<BackedFile> getFile(String originalFileName) {
+        return Optional.ofNullable(allFiles.get(originalFileName));
+    }
+
+    public List<String> listUniqueFileNames() {
+        return Collections.unmodifiableList(uniqueFileNames);
+    }
+
+    public static ShardBackupMetadata empty() {
+        return new ShardBackupMetadata();
+    }
+
+    public static ShardBackupMetadata from(BackupRepository repository, URI dir, ShardBackupId shardBackupId) throws IOException {
+        final String shardBackupMetadataFilename = shardBackupId.getBackupMetadataFilename();
+        if (!repository.exists(repository.resolve(dir, shardBackupMetadataFilename))) {
+            return null;
+        }
+
+        try (IndexInput is = repository.openInput(dir, shardBackupMetadataFilename, IOContext.DEFAULT)) {
+            return from(new PropertiesInputStream(is));
+        }
+    }
+
+    /**
+     * Storing ShardBackupMetadata at {@code folderURI} with name {@code filename}.
+     * If a file already existed there, overwrite it.
+     */
+    public void store(BackupRepository repository, URI folderURI, ShardBackupId shardBackupId) throws IOException {
+        final String filename = shardBackupId.getBackupMetadataFilename();
+        URI fileURI = repository.resolve(folderURI, filename);
+        if (repository.exists(fileURI)) {
+            repository.delete(folderURI, Collections.singleton(filename), true);
+        }
+
+        try (OutputStream os = repository.createOutput(repository.resolve(folderURI, filename))) {
+            store(os);
+        }
+    }
+
+    public Collection<String> listOriginalFileNames() {
+        return Collections.unmodifiableSet(allFiles.keySet());
+    }
+
+    private void store(OutputStream os) throws IOException {
+        @SuppressWarnings({"rawtypes"})
+        Map<String, Map> map = new HashMap<>();
+
+        for (BackedFile backedFile : allFiles.values()) {
+            Map<String, Object> fileMap = new HashMap<>();
+            fileMap.put("fileName", backedFile.originalFileName);
+            fileMap.put("checksum", backedFile.fileChecksum.checksum);
+            fileMap.put("size", backedFile.fileChecksum.size);
+            map.put(backedFile.uniqueFileName, fileMap);
+        }
+
+        Utils.writeJson(map, os, false);
+    }
+
+    private static ShardBackupMetadata from(InputStream is) {
+        @SuppressWarnings({"unchecked"})
+        Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(is);
+        ShardBackupMetadata shardBackupMetadata = new ShardBackupMetadata();
+        for (String uniqueFileName : map.keySet()) {
+            @SuppressWarnings({"unchecked"})
+            Map<String, Object> fileMap = (Map<String, Object>) map.get(uniqueFileName);

Review comment:
       *INEFFICIENT_KEYSET_ITERATOR:*  Accessing a value using a key that was retrieved from a `keySet` iterator. It is more efficient to use an iterator on the `entrySet` of the map, avoiding the extra `HashMap.get(key)` lookup.

##########
File path: solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
##########
@@ -154,19 +194,103 @@ private Replica selectReplicaWithSnapshot(CollectionSnapshotMetaData snapshotMet
     }
 
     Optional<Replica> r = slice.getReplicas().stream()
-                               .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
-                               .findFirst();
+            .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+            .findFirst();
 
     if (!r.isPresent()) {
       throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
+              "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
     }
 
     return r.get();
   }
 
+  private void incrementalCopyIndexFiles(URI backupPath, String collectionName, ZkNodeProps request,
+                                         NamedList<Object> results, BackupProperties backupProperties,
+                                         BackupManager backupManager) throws IOException {
+    String backupName = request.getStr(NAME);
+    String asyncId = request.getStr(ASYNC);
+    String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
+            backupPath);
+
+    Optional<BackupProperties> previousProps = backupManager.tryReadBackupProperties();
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+
+    Collection<Slice> slices = ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices();
+    for (Slice slice : slices) {
+      // Note - Actually this can return a null value when there is no leader for this shard.
+      Replica replica = slice.getLeader();
+      if (replica == null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "No 'leader' replica available for shard " + slice.getName() + " of collection " + collectionName);
+      }
+      String coreName = replica.getStr(CORE_NAME_PROP);
+
+      ModifiableSolrParams params = coreBackupParams(backupPath, repoName, slice, coreName, true /* incremental backup */);
+      params.set(CoreAdminParams.BACKUP_INCREMENTAL, true);
+      previousProps.flatMap(bp -> bp.getShardBackupIdFor(slice.getName()))
+              .ifPresent(prevBackupPoint -> params.set(CoreAdminParams.PREV_SHARD_BACKUP_ID, prevBackupPoint.getIdAsString()));
+
+      ShardBackupId shardBackupId = backupProperties.putAndGetShardBackupIdFor(slice.getName(),
+              backupManager.getBackupId().getId());
+      params.set(CoreAdminParams.SHARD_BACKUP_ID, shardBackupId.getIdAsString());
+
+      shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
+      log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
+    }
+    log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
+
+    String msgOnError = "Could not backup all shards";
+    shardRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+    if (results.get("failure") != null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError);
+    }
+
+    //Aggregating result from different shards
+    @SuppressWarnings({"rawtypes"})
+    NamedList aggRsp = aggregateResults(results, collectionName, backupManager, backupProperties, slices);
+    results.add("response", aggRsp);
+  }
+
+  @SuppressWarnings({"rawtypes"})
+  private NamedList aggregateResults(NamedList results, String collectionName,
+                                     BackupManager backupManager,
+                                     BackupProperties backupProps,
+                                     Collection<Slice> slices) {
+    NamedList<Object> aggRsp = new NamedList<>();
+    aggRsp.add("collection", collectionName);
+    aggRsp.add("numShards", slices.size());
+    aggRsp.add("backupId", backupManager.getBackupId().id);
+    aggRsp.add("indexVersion", backupProps.getIndexVersion());
+    aggRsp.add("startTime", backupProps.getStartTime());
+
+    double indexSizeMB = 0;
+    NamedList shards = (NamedList) results.get("success");
+    for (int i = 0; i < shards.size(); i++) {
+      NamedList shardResp = (NamedList)((NamedList)shards.getVal(i)).get("response");
+      if (shardResp == null)
+        continue;
+      indexSizeMB += (double) shardResp.get("indexSizeMB");

Review comment:
       *NULLPTR_DEREFERENCE:*  accessing memory that is the null pointer on line 275 indirectly during the call to `NamedList.get(...)`.

##########
File path: solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
##########
@@ -193,135 +330,107 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
         propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, newSlices);
       }
 
-      ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
+      ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(propMap), new NamedList());
       // note: when createCollection() returns, the collection exists (no race)
     }
 
-    // Restore collection properties
-    backupMgr.uploadCollectionProperties(location, backupName, restoreCollectionName);
-
-    DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
     //Mark all shards in CONSTRUCTION STATE while we restore the data
-    {
+    private void markAllShardsAsConstruction(DocCollection restoreCollection) throws KeeperException, InterruptedException {
       //TODO might instead createCollection accept an initial state?  Is there a race?
       Map<String, Object> propMap = new HashMap<>();
       propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
       for (Slice shard : restoreCollection.getSlices()) {
         propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
       }
-      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollection.getName());
       ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
     }
 
-    // TODO how do we leverage the RULE / SNITCH logic in createCollection?
-
-    ClusterState clusterState = zkStateReader.getClusterState();
-
-    List<String> sliceNames = new ArrayList<>();
-    restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
-
-    Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
-            .forCollection(restoreCollectionName)
-            .forShard(sliceNames)
-            .assignNrtReplicas(numNrtReplicas)
-            .assignTlogReplicas(numTlogReplicas)
-            .assignPullReplicas(numPullReplicas)
-            .onNodes(nodeList)
-            .build();
-    Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
-        ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
-        clusterState, restoreCollection);
-    List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
+    private List<ReplicaPosition> getReplicaPositions(DocCollection restoreCollection, List<String> nodeList, ClusterState clusterState, List<String> sliceNames) throws IOException, InterruptedException {
+      Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
+              .forCollection(restoreCollection.getName())
+              .forShard(sliceNames)
+              .assignNrtReplicas(numNrtReplicas)
+              .assignTlogReplicas(numTlogReplicas)
+              .assignPullReplicas(numPullReplicas)
+              .onNodes(nodeList)
+              .build();
+      Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
+              ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
+              clusterState, restoreCollection);
+      return assignStrategy.assign(ocmh.cloudManager, assignRequest);
+    }
 
-    CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void createSingleReplicaPerShard(NamedList results,
+                                             DocCollection restoreCollection,
+                                             String asyncId,
+                                             ClusterState clusterState, List<ReplicaPosition> replicaPositions) throws Exception {
+      CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
 
-    //Create one replica per shard and copy backed up data to it
-    for (Slice slice : restoreCollection.getSlices()) {
-      if (log.isInfoEnabled()) {
-        log.info("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
-      }
-      HashMap<String, Object> propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
-      propMap.put(COLLECTION_PROP, restoreCollectionName);
-      propMap.put(SHARD_ID_PROP, slice.getName());
-
-      if (numNrtReplicas >= 1) {
-        propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
-      } else if (numTlogReplicas >= 1) {
-        propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
-                Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
-      }
-
-      // Get the first node matching the shard to restore in
-      String node;
-      for (ReplicaPosition replicaPosition : replicaPositions) {
-        if (Objects.equals(replicaPosition.shard, slice.getName())) {
-          node = replicaPosition.node;
-          propMap.put(CoreAdminParams.NODE, node);
-          replicaPositions.remove(replicaPosition);
-          break;
+      //Create one replica per shard and copy backed up data to it
+      for (Slice slice : restoreCollection.getSlices()) {
+        String sliceName = slice.getName();
+        log.info("Adding replica for shard={} collection={} ", sliceName, restoreCollection);
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
+        propMap.put(COLLECTION_PROP, restoreCollection.getName());
+        propMap.put(SHARD_ID_PROP, sliceName);
+
+        if (numNrtReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+        } else if (numTlogReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+        } else {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
+                  Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
         }
-      }
 
-      // add async param
-      if (asyncId != null) {
-        propMap.put(ASYNC, asyncId);
-      }
-      ocmh.addPropertyParams(message, propMap);
-      final NamedList addReplicaResult = new NamedList();
-      ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addReplicaResult, () -> {
-        Object addResultFailure = addReplicaResult.get("failure");
-        if (addResultFailure != null) {
-          SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
-          if (failure == null) {
-            failure = new SimpleOrderedMap();
-            results.add("failure", failure);
+        // Get the first node matching the shard to restore in
+        String node;
+        for (ReplicaPosition replicaPosition : replicaPositions) {
+          if (Objects.equals(replicaPosition.shard, sliceName)) {
+            node = replicaPosition.node;
+            propMap.put(CoreAdminParams.NODE, node);
+            replicaPositions.remove(replicaPosition);
+            break;
           }
-          failure.addAll((NamedList) addResultFailure);
-        } else {
-          SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
-          if (success == null) {
-            success = new SimpleOrderedMap();
-            results.add("success", success);
-          }
-          success.addAll((NamedList) addReplicaResult.get("success"));
         }
-        countDownLatch.countDown();
-      });
-    }
 
-    boolean allIsDone = countDownLatch.await(1, TimeUnit.HOURS);
-    if (!allIsDone) {
-      throw new TimeoutException("Initial replicas were not created within 1 hour. Timing out.");
-    }
-    Object failures = results.get("failure");
-    if (failures != null && ((SimpleOrderedMap) failures).size() > 0) {
-      log.error("Restore failed to create initial replicas.");
-      ocmh.cleanupCollection(restoreCollectionName, new NamedList<Object>());
-      return;
-    }
-
-    //refresh the location copy of collection state
-    restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+        // add async param
+        if (asyncId != null) {
+          propMap.put(ASYNC, asyncId);
+        }
+        ocmh.addPropertyParams(message, propMap);
+        final NamedList addReplicaResult = new NamedList();
+        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addReplicaResult, () -> {
+          Object addResultFailure = addReplicaResult.get("failure");
+          if (addResultFailure != null) {
+            SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+            if (failure == null) {
+              failure = new SimpleOrderedMap();
+              results.add("failure", failure);
+            }
+            failure.addAll((NamedList) addResultFailure);
+          } else {
+            SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+            if (success == null) {
+              success = new SimpleOrderedMap();
+              results.add("success", success);
+            }
+            success.addAll((NamedList) addReplicaResult.get("success"));

Review comment:
       *NULLPTR_DEREFERENCE:*  call to `NamedList.addAll(...)` eventually accesses memory that is the null pointer on line 421 indirectly during the call to `NamedList.get(...)`.

##########
File path: solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupProperties;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+
+/**
+ * Used to test the incremental method of backup/restoration (as opposed to the deprecated 'full snapshot' method).
+ *
+ * For a similar test harness for snapshot backup/restoration see {@link AbstractCloudBackupRestoreTestCase}
+ */
+public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static long docsSeed; // see indexDocs()
+    protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+    protected static final String BACKUPNAME_PREFIX = "mytestbackup";
+    protected static final String BACKUP_REPO_NAME = "trackingBackupRepository";
+
+    protected String testSuffix = "test1";
+    protected int replFactor;
+    protected int numTlogReplicas;
+    protected int numPullReplicas;
+
+    @BeforeClass
+    public static void createCluster() throws Exception {
+        docsSeed = random().nextLong();
+        System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    }
+
+    /**
+     * @return The name of the collection to use.
+     */
+    public abstract String getCollectionNamePrefix();
+
+    public String getCollectionName(){
+        return getCollectionNamePrefix() + "_" + testSuffix;
+    }
+
+    public void setTestSuffix(String testSuffix) {
+        this.testSuffix = testSuffix;
+    }
+
+    private void randomizeReplicaTypes() {
+        replFactor = TestUtil.nextInt(random(), 1, 2);
+//    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+//    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    }
+
+    /**
+     * @return The absolute path for the backup location.
+     *         Could return null.
+     */
+    public abstract String getBackupLocation();
+
+    @Test
+    public void testSimple() throws Exception {
+        final String backupCollectionName = getCollectionName();
+        final String restoreCollectionName = backupCollectionName + "_restore";
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupincsimple");
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
+                .process(solrClient);
+        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            long t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
+            expectedNumDocs += indexDocs(backupCollectionName, true);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            long numFound = cluster.getSolrClient().query(backupCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            log.info("Created backup with {} docs, took {}ms", numFound, timeTaken);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                    .setBackupId(0)
+                    .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).processAndWait(solrClient, 500);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Restored from backup, took {}ms", timeTaken);
+            numFound = cluster.getSolrClient().query(restoreCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            assertEquals(expectedNumDocs, numFound);
+        }
+    }
+
+    @Test
+    @Slow
+    @SuppressWarnings("rawtypes")
+    public void testBackupIncremental() throws Exception {
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupinc");
+        randomizeReplicaTypes();
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+                .process(solrClient);
+
+        indexDocs(getCollectionName(), false);
+
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
+
+            backupRestoreThenCheck(solrClient, verifier);
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            // adding more commits to trigger merging segments
+            for (int i = 0; i < 15; i++) {
+                indexDocs(getCollectionName(), 5,false);
+            }
+            backupRestoreThenCheck(solrClient, verifier);
+
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+            new UpdateRequest()
+                    .deleteByQuery("*:*")
+                    .commit(cluster.getSolrClient(), getCollectionName());
+            indexDocs(getCollectionName(), false);
+            // corrupt index files
+            corruptIndexFiles();
+            try {
+                log.info("Create backup after corrupt index files");
+                CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                        .setLocation(backupLocation)
+                        .setIncremental(true)
+                        .setMaxNumberBackupPoints(3)
+                        .setRepositoryName(BACKUP_REPO_NAME);
+                if (random().nextBoolean()) {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    if (state != RequestStatusState.FAILED) {
+                        fail("This backup should be failed");
+                    }
+                } else {
+                    CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                    fail("This backup should be failed");
+                }
+            } catch (Exception e) {
+                // expected
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected void corruptIndexFiles() throws IOException {
+        Collection<Slice> slices = getCollectionState(getCollectionName()).getSlices();
+        Slice slice = slices.iterator().next();
+        JettySolrRunner leaderNode = cluster.getReplicaJetty(slice.getLeader());

Review comment:
       *NULLPTR_DEREFERENCE:*  call to `MiniSolrCloudCluster.getReplicaJetty(...)` eventually accesses memory that is the null pointer on line 267 indirectly during the call to `MiniSolrCloudCluster.getReplicaJetty(...)`.

##########
File path: solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
##########
@@ -193,135 +330,107 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
         propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, newSlices);
       }
 
-      ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
+      ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(propMap), new NamedList());
       // note: when createCollection() returns, the collection exists (no race)
     }
 
-    // Restore collection properties
-    backupMgr.uploadCollectionProperties(location, backupName, restoreCollectionName);
-
-    DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
     //Mark all shards in CONSTRUCTION STATE while we restore the data
-    {
+    private void markAllShardsAsConstruction(DocCollection restoreCollection) throws KeeperException, InterruptedException {
       //TODO might instead createCollection accept an initial state?  Is there a race?
       Map<String, Object> propMap = new HashMap<>();
       propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
       for (Slice shard : restoreCollection.getSlices()) {
         propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
       }
-      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollection.getName());
       ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
     }
 
-    // TODO how do we leverage the RULE / SNITCH logic in createCollection?
-
-    ClusterState clusterState = zkStateReader.getClusterState();
-
-    List<String> sliceNames = new ArrayList<>();
-    restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
-
-    Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
-            .forCollection(restoreCollectionName)
-            .forShard(sliceNames)
-            .assignNrtReplicas(numNrtReplicas)
-            .assignTlogReplicas(numTlogReplicas)
-            .assignPullReplicas(numPullReplicas)
-            .onNodes(nodeList)
-            .build();
-    Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
-        ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
-        clusterState, restoreCollection);
-    List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
+    private List<ReplicaPosition> getReplicaPositions(DocCollection restoreCollection, List<String> nodeList, ClusterState clusterState, List<String> sliceNames) throws IOException, InterruptedException {
+      Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
+              .forCollection(restoreCollection.getName())
+              .forShard(sliceNames)
+              .assignNrtReplicas(numNrtReplicas)
+              .assignTlogReplicas(numTlogReplicas)
+              .assignPullReplicas(numPullReplicas)
+              .onNodes(nodeList)
+              .build();
+      Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
+              ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
+              clusterState, restoreCollection);
+      return assignStrategy.assign(ocmh.cloudManager, assignRequest);
+    }
 
-    CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void createSingleReplicaPerShard(NamedList results,
+                                             DocCollection restoreCollection,
+                                             String asyncId,
+                                             ClusterState clusterState, List<ReplicaPosition> replicaPositions) throws Exception {
+      CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
 
-    //Create one replica per shard and copy backed up data to it
-    for (Slice slice : restoreCollection.getSlices()) {
-      if (log.isInfoEnabled()) {
-        log.info("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
-      }
-      HashMap<String, Object> propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
-      propMap.put(COLLECTION_PROP, restoreCollectionName);
-      propMap.put(SHARD_ID_PROP, slice.getName());
-
-      if (numNrtReplicas >= 1) {
-        propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
-      } else if (numTlogReplicas >= 1) {
-        propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
-                Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
-      }
-
-      // Get the first node matching the shard to restore in
-      String node;
-      for (ReplicaPosition replicaPosition : replicaPositions) {
-        if (Objects.equals(replicaPosition.shard, slice.getName())) {
-          node = replicaPosition.node;
-          propMap.put(CoreAdminParams.NODE, node);
-          replicaPositions.remove(replicaPosition);
-          break;
+      //Create one replica per shard and copy backed up data to it
+      for (Slice slice : restoreCollection.getSlices()) {
+        String sliceName = slice.getName();
+        log.info("Adding replica for shard={} collection={} ", sliceName, restoreCollection);
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
+        propMap.put(COLLECTION_PROP, restoreCollection.getName());
+        propMap.put(SHARD_ID_PROP, sliceName);
+
+        if (numNrtReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+        } else if (numTlogReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+        } else {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
+                  Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
         }
-      }
 
-      // add async param
-      if (asyncId != null) {
-        propMap.put(ASYNC, asyncId);
-      }
-      ocmh.addPropertyParams(message, propMap);
-      final NamedList addReplicaResult = new NamedList();
-      ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addReplicaResult, () -> {
-        Object addResultFailure = addReplicaResult.get("failure");
-        if (addResultFailure != null) {
-          SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
-          if (failure == null) {
-            failure = new SimpleOrderedMap();
-            results.add("failure", failure);
+        // Get the first node matching the shard to restore in
+        String node;
+        for (ReplicaPosition replicaPosition : replicaPositions) {
+          if (Objects.equals(replicaPosition.shard, sliceName)) {
+            node = replicaPosition.node;
+            propMap.put(CoreAdminParams.NODE, node);
+            replicaPositions.remove(replicaPosition);
+            break;
           }
-          failure.addAll((NamedList) addResultFailure);
-        } else {
-          SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
-          if (success == null) {
-            success = new SimpleOrderedMap();
-            results.add("success", success);
-          }
-          success.addAll((NamedList) addReplicaResult.get("success"));
         }
-        countDownLatch.countDown();
-      });
-    }
 
-    boolean allIsDone = countDownLatch.await(1, TimeUnit.HOURS);
-    if (!allIsDone) {
-      throw new TimeoutException("Initial replicas were not created within 1 hour. Timing out.");
-    }
-    Object failures = results.get("failure");
-    if (failures != null && ((SimpleOrderedMap) failures).size() > 0) {
-      log.error("Restore failed to create initial replicas.");
-      ocmh.cleanupCollection(restoreCollectionName, new NamedList<Object>());
-      return;
-    }
-
-    //refresh the location copy of collection state
-    restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+        // add async param
+        if (asyncId != null) {
+          propMap.put(ASYNC, asyncId);
+        }
+        ocmh.addPropertyParams(message, propMap);
+        final NamedList addReplicaResult = new NamedList();
+        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addReplicaResult, () -> {
+          Object addResultFailure = addReplicaResult.get("failure");
+          if (addResultFailure != null) {
+            SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+            if (failure == null) {
+              failure = new SimpleOrderedMap();
+              results.add("failure", failure);
+            }
+            failure.addAll((NamedList) addResultFailure);
+          } else {
+            SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+            if (success == null) {
+              success = new SimpleOrderedMap();
+              results.add("success", success);
+            }
+            success.addAll((NamedList) addReplicaResult.get("success"));

Review comment:
       *NULL_DEREFERENCE:*  object returned by `addReplicaResult.get("success")` could be null and is dereferenced by call to `addAll(...)` at line 421.

##########
File path: solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.handler;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.commons.math3.util.Precision;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.IndexDeletionPolicyWrapper;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for orchestrating the actual incremental backup process.
+ *
+ * If this is the first backup for a collection, all files are uploaded.  But if previous backups exist, uses the most recent
+ * {@link ShardBackupMetadata} file to determine which files already exist in the repository and can be skipped.
+ */
+public class IncrementalShardBackup {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private SolrCore solrCore;
+
+    private BackupFilePaths incBackupFiles;
+    private BackupRepository backupRepo;
+
+    private ShardBackupId prevShardBackupId;
+    private ShardBackupId shardBackupId;
+
+    /**
+     *
+     * @param prevShardBackupId previous ShardBackupMetadata file which will be used for skipping
+     *                             uploading index files already present in this file.
+     * @param shardBackupId file where all meta data of this backup will be stored to.
+     */
+    public IncrementalShardBackup(BackupRepository backupRepo, SolrCore solrCore, BackupFilePaths incBackupFiles,
+                                  ShardBackupId prevShardBackupId, ShardBackupId shardBackupId) {
+        this.backupRepo = backupRepo;
+        this.solrCore = solrCore;
+        this.incBackupFiles = incBackupFiles;
+        this.prevShardBackupId = prevShardBackupId;
+        this.shardBackupId = shardBackupId;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public NamedList backup() throws Exception {
+        final IndexCommit indexCommit = getAndSaveIndexCommit();
+        try {
+            return backup(indexCommit);
+        } finally {
+            solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
+        }
+    }
+
+    /**
+     * Returns {@link IndexDeletionPolicyWrapper#getAndSaveLatestCommit}.
+     * <p>
+     * Note:
+     * <ul>
+     *  <li>This method does error handling when the commit can't be found and wraps them in {@link SolrException}
+     *  </li>
+     *  <li>If this method returns, the result will be non null, and the caller <em>MUST</em>
+     *      call {@link IndexDeletionPolicyWrapper#releaseCommitPoint} when finished
+     *  </li>
+     * </ul>
+     */
+    private IndexCommit getAndSaveIndexCommit() throws IOException {
+        final IndexDeletionPolicyWrapper delPolicy = solrCore.getDeletionPolicy();
+        final IndexCommit commit = delPolicy.getAndSaveLatestCommit();
+        if (null == commit) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Index does not yet have any commits for core " +
+                    solrCore.getName());
+        }
+        if (log.isDebugEnabled())   {
+            log.debug("Using latest commit: generation={}", commit.getGeneration());
+        }
+        return commit;
+    }
+
+    // note: remember to reserve the indexCommit first so it won't get deleted concurrently
+    @SuppressWarnings({"rawtypes"})
+    protected NamedList backup(final IndexCommit indexCommit) throws Exception {
+        assert indexCommit != null;
+        URI backupLocation = incBackupFiles.getBackupLocation();
+        log.info("Creating backup snapshot at {} shardBackupMetadataFile:{}", backupLocation, shardBackupId);
+        NamedList<Object> details = new NamedList<>();
+        details.add("startTime", Instant.now().toString());
+
+        Collection<String> files = indexCommit.getFileNames();
+        Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(),
+                DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+        try {
+            BackupStats stats = incrementalCopy(files, dir);
+            details.add("indexFileCount", stats.fileCount);
+            details.add("uploadedIndexFileCount", stats.uploadedFileCount);
+            details.add("indexSizeMB", stats.getIndexSizeMB());
+            details.add("uploadedIndexFileMB", stats.getTotalUploadedMB());
+        } finally {
+            solrCore.getDirectoryFactory().release(dir);
+        }
+
+        CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+        if (cd != null) {
+            details.add("shard", cd.getShardId());
+        }
+
+        details.add("endTime", Instant.now().toString());
+        details.add("shardBackupId", shardBackupId.getIdAsString());
+        log.info("Done creating backup snapshot at {} shardBackupMetadataFile:{}", backupLocation, shardBackupId);
+        return details;
+    }
+
+    private ShardBackupMetadata getPrevBackupPoint() throws IOException {
+        if (prevShardBackupId == null) {
+            return ShardBackupMetadata.empty();
+        }
+        return ShardBackupMetadata.from(backupRepo, incBackupFiles.getShardBackupMetadataDir(), prevShardBackupId);
+    }
+
+    private BackupStats incrementalCopy(Collection<String> indexFiles, Directory dir) throws IOException {
+        ShardBackupMetadata oldBackupPoint = getPrevBackupPoint();
+        ShardBackupMetadata currentBackupPoint = ShardBackupMetadata.empty();
+        URI indexDir = incBackupFiles.getIndexDir();
+        BackupStats backupStats = new BackupStats();
+
+        for(String fileName : indexFiles) {
+            Optional<ShardBackupMetadata.BackedFile> opBackedFile = oldBackupPoint.getFile(fileName);

Review comment:
       *NULL_DEREFERENCE:*  object `oldBackupPoint` last assigned on line 152 could be null and is dereferenced at line 158.

##########
File path: solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupProperties;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+
+/**
+ * Used to test the incremental method of backup/restoration (as opposed to the deprecated 'full snapshot' method).
+ *
+ * For a similar test harness for snapshot backup/restoration see {@link AbstractCloudBackupRestoreTestCase}
+ */
+public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static long docsSeed; // see indexDocs()
+    protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+    protected static final String BACKUPNAME_PREFIX = "mytestbackup";
+    protected static final String BACKUP_REPO_NAME = "trackingBackupRepository";
+
+    protected String testSuffix = "test1";
+    protected int replFactor;
+    protected int numTlogReplicas;
+    protected int numPullReplicas;
+
+    @BeforeClass
+    public static void createCluster() throws Exception {
+        docsSeed = random().nextLong();
+        System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    }
+
+    /**
+     * @return The name of the collection to use.
+     */
+    public abstract String getCollectionNamePrefix();
+
+    public String getCollectionName(){
+        return getCollectionNamePrefix() + "_" + testSuffix;
+    }
+
+    public void setTestSuffix(String testSuffix) {
+        this.testSuffix = testSuffix;
+    }
+
+    private void randomizeReplicaTypes() {
+        replFactor = TestUtil.nextInt(random(), 1, 2);
+//    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+//    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    }
+
+    /**
+     * @return The absolute path for the backup location.
+     *         Could return null.
+     */
+    public abstract String getBackupLocation();
+
+    @Test
+    public void testSimple() throws Exception {
+        final String backupCollectionName = getCollectionName();
+        final String restoreCollectionName = backupCollectionName + "_restore";
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupincsimple");
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
+                .process(solrClient);
+        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            long t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
+            expectedNumDocs += indexDocs(backupCollectionName, true);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            long numFound = cluster.getSolrClient().query(backupCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            log.info("Created backup with {} docs, took {}ms", numFound, timeTaken);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                    .setBackupId(0)
+                    .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).processAndWait(solrClient, 500);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Restored from backup, took {}ms", timeTaken);
+            numFound = cluster.getSolrClient().query(restoreCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            assertEquals(expectedNumDocs, numFound);
+        }
+    }
+
+    @Test
+    @Slow
+    @SuppressWarnings("rawtypes")
+    public void testBackupIncremental() throws Exception {
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupinc");
+        randomizeReplicaTypes();
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+                .process(solrClient);
+
+        indexDocs(getCollectionName(), false);
+
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
+
+            backupRestoreThenCheck(solrClient, verifier);
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            // adding more commits to trigger merging segments
+            for (int i = 0; i < 15; i++) {
+                indexDocs(getCollectionName(), 5,false);
+            }
+            backupRestoreThenCheck(solrClient, verifier);
+
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+            new UpdateRequest()
+                    .deleteByQuery("*:*")
+                    .commit(cluster.getSolrClient(), getCollectionName());
+            indexDocs(getCollectionName(), false);
+            // corrupt index files
+            corruptIndexFiles();
+            try {
+                log.info("Create backup after corrupt index files");
+                CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                        .setLocation(backupLocation)
+                        .setIncremental(true)
+                        .setMaxNumberBackupPoints(3)
+                        .setRepositoryName(BACKUP_REPO_NAME);
+                if (random().nextBoolean()) {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    if (state != RequestStatusState.FAILED) {
+                        fail("This backup should be failed");
+                    }
+                } else {
+                    CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                    fail("This backup should be failed");
+                }
+            } catch (Exception e) {
+                // expected
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected void corruptIndexFiles() throws IOException {
+        Collection<Slice> slices = getCollectionState(getCollectionName()).getSlices();
+        Slice slice = slices.iterator().next();
+        JettySolrRunner leaderNode = cluster.getReplicaJetty(slice.getLeader());
+
+        SolrCore solrCore = leaderNode.getCoreContainer().getCore(slice.getLeader().getCoreName());
+        Set<String> fileNames = new HashSet<>(solrCore.getDeletionPolicy().getLatestCommit().getFileNames());
+        File indexFolder = new File(solrCore.getIndexDir());
+        File fileGetCorrupted = Stream.of(Objects.requireNonNull(indexFolder.listFiles()))
+                .filter(x -> fileNames.contains(x.getName()))
+                .findAny().get();
+        try (FileInputStream fis = new FileInputStream(fileGetCorrupted)){
+            byte[] contents = IOUtils.readFully(fis, (int) fileGetCorrupted.length());
+            contents[contents.length - CodecUtil.footerLength() - 1] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 2] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 3] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 4] += 1;
+            try (FileOutputStream fos = new FileOutputStream(fileGetCorrupted)) {
+                IOUtils.write(contents, fos);
+            }
+        } finally {
+            solrCore.close();
+        }
+    }
+
+    private void backupRestoreThenCheck(CloudSolrClient solrClient,
+                                        IncrementalBackupVerifier verifier) throws Exception {
+        verifier.incrementalBackupThenVerify();
+
+        if( random().nextBoolean() )
+            simpleRestoreAndCheckDocCount(solrClient, verifier.backupLocation, verifier.backupName);
+    }
+
+    private void simpleRestoreAndCheckDocCount(CloudSolrClient solrClient, String backupLocation, String backupName) throws Exception{
+        Map<String, Integer> origShardToDocCount = AbstractCloudBackupRestoreTestCase.getShardToDocCountMap(solrClient, getCollectionState(getCollectionName()));
+
+        String restoreCollectionName = getCollectionName() + "_restored";
+
+        CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).process(solrClient);
+
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+                restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
+
+        // check num docs are the same
+        assertEquals(origShardToDocCount, AbstractCloudBackupRestoreTestCase.getShardToDocCountMap(solrClient, getCollectionState(restoreCollectionName)));
+
+        // this methods may get invoked multiple times, collection must be cleanup
+        CollectionAdminRequest.deleteCollection(restoreCollectionName).process(solrClient);
+    }
+
+
+    private void indexDocs(String collectionName, int numDocs, boolean useUUID) throws Exception {
+        Random random = new Random(docsSeed);
+
+        List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+        for (int i=0; i<numDocs; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", (useUUID ? java.util.UUID.randomUUID().toString() : i));
+            doc.addField("shard_s", "shard" + (1 + random.nextInt(NUM_SHARDS))); // for implicit router
+            docs.add(doc);
+        }
+
+        CloudSolrClient client = cluster.getSolrClient();
+        client.add(collectionName, docs); //batch
+        client.commit(collectionName);
+
+        log.info("Indexed {} docs to collection: {}", numDocs, collectionName);
+    }
+
+    private int indexDocs(String collectionName, boolean useUUID) throws Exception {
+        Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
+        int numDocs = random.nextInt(100) + 5;
+        indexDocs(collectionName, numDocs, useUUID);
+        return numDocs;
+    }
+
+    private class IncrementalBackupVerifier {
+        private BackupRepository repository;
+        private URI backupURI;
+        private String backupLocation;
+        private String backupName;
+        private BackupFilePaths incBackupFiles;
+
+        private Map<String, Collection<String>> lastShardCommitToBackupFiles = new HashMap<>();
+        // the first generation after calling backup is zero
+        private int numBackup = -1;
+        private int maxNumberOfBackupToKeep = 4;
+
+        IncrementalBackupVerifier(BackupRepository repository, String backupLocation,
+                                  String backupName, String collection, int maxNumberOfBackupToKeep) {
+            this.repository = repository;
+            this.backupLocation = backupLocation;
+            this.backupURI = repository.resolve(repository.createURI(backupLocation), backupName, collection);
+            this.incBackupFiles = new BackupFilePaths(repository, this.backupURI);
+            this.backupName = backupName;
+            this.maxNumberOfBackupToKeep = maxNumberOfBackupToKeep;
+        }
+
+        @SuppressWarnings("rawtypes")
+        private void backupThenWait() throws SolrServerException, IOException {
+            CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setMaxNumberBackupPoints(maxNumberOfBackupToKeep)
+                    .setRepositoryName(BACKUP_REPO_NAME);
+            if (random().nextBoolean()) {
+                try {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    assertEquals(RequestStatusState.COMPLETED, state);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                numBackup++;
+            } else {
+                CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                assertEquals(0, rsp.getStatus());
+                NamedList resp = (NamedList) rsp.getResponse().get("response");
+                numBackup++;
+                assertEquals(numBackup, resp.get("backupId"));;

Review comment:
       *NULL_DEREFERENCE:*  object `resp` last assigned on line 364 could be null and is dereferenced at line 366.

##########
File path: solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupProperties;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+
+/**
+ * Used to test the incremental method of backup/restoration (as opposed to the deprecated 'full snapshot' method).
+ *
+ * For a similar test harness for snapshot backup/restoration see {@link AbstractCloudBackupRestoreTestCase}
+ */
+public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static long docsSeed; // see indexDocs()
+    protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+    protected static final String BACKUPNAME_PREFIX = "mytestbackup";
+    protected static final String BACKUP_REPO_NAME = "trackingBackupRepository";
+
+    protected String testSuffix = "test1";
+    protected int replFactor;
+    protected int numTlogReplicas;
+    protected int numPullReplicas;
+
+    @BeforeClass
+    public static void createCluster() throws Exception {
+        docsSeed = random().nextLong();
+        System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    }
+
+    /**
+     * @return The name of the collection to use.
+     */
+    public abstract String getCollectionNamePrefix();
+
+    public String getCollectionName(){
+        return getCollectionNamePrefix() + "_" + testSuffix;
+    }
+
+    public void setTestSuffix(String testSuffix) {
+        this.testSuffix = testSuffix;
+    }
+
+    private void randomizeReplicaTypes() {
+        replFactor = TestUtil.nextInt(random(), 1, 2);
+//    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+//    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    }
+
+    /**
+     * @return The absolute path for the backup location.
+     *         Could return null.
+     */
+    public abstract String getBackupLocation();
+
+    @Test
+    public void testSimple() throws Exception {
+        final String backupCollectionName = getCollectionName();
+        final String restoreCollectionName = backupCollectionName + "_restore";
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupincsimple");
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
+                .process(solrClient);
+        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            long t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
+            expectedNumDocs += indexDocs(backupCollectionName, true);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            long numFound = cluster.getSolrClient().query(backupCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            log.info("Created backup with {} docs, took {}ms", numFound, timeTaken);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                    .setBackupId(0)
+                    .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).processAndWait(solrClient, 500);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Restored from backup, took {}ms", timeTaken);
+            numFound = cluster.getSolrClient().query(restoreCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            assertEquals(expectedNumDocs, numFound);
+        }
+    }
+
+    @Test
+    @Slow
+    @SuppressWarnings("rawtypes")
+    public void testBackupIncremental() throws Exception {
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupinc");
+        randomizeReplicaTypes();
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+                .process(solrClient);
+
+        indexDocs(getCollectionName(), false);
+
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
+
+            backupRestoreThenCheck(solrClient, verifier);
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            // adding more commits to trigger merging segments
+            for (int i = 0; i < 15; i++) {
+                indexDocs(getCollectionName(), 5,false);
+            }
+            backupRestoreThenCheck(solrClient, verifier);
+
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+            new UpdateRequest()
+                    .deleteByQuery("*:*")
+                    .commit(cluster.getSolrClient(), getCollectionName());
+            indexDocs(getCollectionName(), false);
+            // corrupt index files
+            corruptIndexFiles();
+            try {
+                log.info("Create backup after corrupt index files");
+                CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                        .setLocation(backupLocation)
+                        .setIncremental(true)
+                        .setMaxNumberBackupPoints(3)
+                        .setRepositoryName(BACKUP_REPO_NAME);
+                if (random().nextBoolean()) {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    if (state != RequestStatusState.FAILED) {
+                        fail("This backup should be failed");
+                    }
+                } else {
+                    CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                    fail("This backup should be failed");
+                }
+            } catch (Exception e) {
+                // expected
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected void corruptIndexFiles() throws IOException {
+        Collection<Slice> slices = getCollectionState(getCollectionName()).getSlices();
+        Slice slice = slices.iterator().next();
+        JettySolrRunner leaderNode = cluster.getReplicaJetty(slice.getLeader());
+
+        SolrCore solrCore = leaderNode.getCoreContainer().getCore(slice.getLeader().getCoreName());

Review comment:
       *NULL_DEREFERENCE:*  object returned by `leaderNode.getCoreContainer()` could be null and is dereferenced at line 252.

##########
File path: solr/test-framework/src/java/org/apache/solr/core/TrackingBackupRepository.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
+
+public class TrackingBackupRepository implements BackupRepository {
+    private static final List<URI> COPIED_FILES = Collections.synchronizedList(new ArrayList<>());
+
+    private BackupRepository delegate;
+
+    @Override
+    public <T> T getConfigProperty(String name) {
+        return delegate.getConfigProperty(name);
+    }
+
+    @Override
+    public URI createURI(String path) {
+        return delegate.createURI(path);
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        return delegate.resolve(baseUri, pathComponents);
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        return delegate.exists(path);
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        return delegate.getPathType(path);
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        return delegate.listAll(path);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) throws IOException {
+        return delegate.openInput(dirPath, fileName, ctx);
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        return delegate.createOutput(path);
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        delegate.createDirectory(path);
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        delegate.deleteDirectory(path);
+    }
+
+    @Override
+    public void copyIndexFileFrom(Directory sourceDir, String sourceFileName, URI destDir, String destFileName) throws IOException {
+        COPIED_FILES.add(delegate.resolve(destDir, destFileName));
+        delegate.copyIndexFileFrom(sourceDir, sourceFileName, destDir, destFileName);
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+
+
+    @Override
+    public void delete(URI path, Collection<String> files, boolean ignoreNoSuchFileException) throws IOException {
+        delegate.delete(path, files, ignoreNoSuchFileException);
+    }
+
+    @Override
+    public Checksum checksum(Directory dir, String fileName) throws IOException {
+        return delegate.checksum(dir, fileName);
+    }
+
+    @Override
+    public void init(@SuppressWarnings("rawtypes") NamedList args) {
+        BackupRepositoryFactory factory = (BackupRepositoryFactory) args.get("factory");
+        SolrResourceLoader loader = (SolrResourceLoader) args.get("loader");
+        String repoName = (String) args.get("delegateRepoName");
+
+        this.delegate = factory.newInstance(loader, repoName);

Review comment:
       *NULL_DEREFERENCE:*  object `factory` last assigned on line 115 could be null and is dereferenced at line 119.

##########
File path: solr/core/src/java/org/apache/solr/handler/admin/BackupCoreOp.java
##########
@@ -18,56 +18,76 @@
 package org.apache.solr.handler.admin;
 
 import java.net.URI;
+import java.nio.file.Paths;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.backup.ShardBackupId;
 import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.apache.solr.handler.IncrementalShardBackup;
 import org.apache.solr.handler.SnapShooter;
 
 import static org.apache.solr.common.params.CommonParams.NAME;
 
-
 class BackupCoreOp implements CoreAdminHandler.CoreAdminOp {
   @Override
   public void execute(CoreAdminHandler.CallInfo it) throws Exception {
     final SolrParams params = it.req.getParams();
 
     String cname = params.required().get(CoreAdminParams.CORE);
     String name = params.required().get(NAME);
-
+    boolean incremental = params.getBool(CoreAdminParams.BACKUP_INCREMENTAL, true);
+    String shardBackupIdStr = params.get(CoreAdminParams.SHARD_BACKUP_ID, null);
+    String prevShardBackupIdStr = params.get(CoreAdminParams.PREV_SHARD_BACKUP_ID, null);
     String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
-    BackupRepository repository = it.handler.coreContainer.newBackupRepository(repoName);
-
-    String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
-    if (location == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
-          + " parameter or as a default repository property");
-    }
-
     // An optional parameter to describe the snapshot to be backed-up. If this
     // parameter is not supplied, the latest index commit is backed-up.
     String commitName = params.get(CoreAdminParams.COMMIT_NAME);
 
-    URI locationUri = repository.createURI(location);
-    try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
-      SnapShooter snapShooter = new SnapShooter(repository, core, locationUri, name, commitName);
-      // validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
-      //  But we want to throw. One reason is that
-      //  this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
-      //  file system. Otherwise, perhaps the FS location isn't shared -- we want an error.
-      if (!snapShooter.getBackupRepository().exists(snapShooter.getLocation())) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Directory to contain snapshots doesn't exist: " + snapShooter.getLocation() + ". " +
-            "Note that Backup/Restore of a SolrCloud collection " +
-            "requires a shared file system mounted at the same path on all nodes!");
+    try (BackupRepository repository = it.handler.coreContainer.newBackupRepository(repoName);
+         SolrCore core = it.handler.coreContainer.getCore(cname)) {
+      String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION));
+      if (location == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'location' is not specified as a query"
+                + " parameter or as a default repository property");
+      }
+
+      URI locationUri = repository.createURI(location);
+
+      if (incremental) {
+        if ("file".equals(locationUri.getScheme())) {
+          core.getCoreContainer().assertPathAllowed(Paths.get(location));

Review comment:
       *PATH_TRAVERSAL_IN:*  This API (java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;) reads a file whose location might be specified by user input [(details)](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN)

##########
File path: solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupProperties;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+
+/**
+ * Used to test the incremental method of backup/restoration (as opposed to the deprecated 'full snapshot' method).
+ *
+ * For a similar test harness for snapshot backup/restoration see {@link AbstractCloudBackupRestoreTestCase}
+ */
+public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static long docsSeed; // see indexDocs()
+    protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+    protected static final String BACKUPNAME_PREFIX = "mytestbackup";
+    protected static final String BACKUP_REPO_NAME = "trackingBackupRepository";
+
+    protected String testSuffix = "test1";
+    protected int replFactor;
+    protected int numTlogReplicas;
+    protected int numPullReplicas;
+
+    @BeforeClass
+    public static void createCluster() throws Exception {
+        docsSeed = random().nextLong();
+        System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    }
+
+    /**
+     * @return The name of the collection to use.
+     */
+    public abstract String getCollectionNamePrefix();
+
+    public String getCollectionName(){
+        return getCollectionNamePrefix() + "_" + testSuffix;
+    }
+
+    public void setTestSuffix(String testSuffix) {
+        this.testSuffix = testSuffix;
+    }
+
+    private void randomizeReplicaTypes() {
+        replFactor = TestUtil.nextInt(random(), 1, 2);
+//    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+//    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    }
+
+    /**
+     * @return The absolute path for the backup location.
+     *         Could return null.
+     */
+    public abstract String getBackupLocation();
+
+    @Test
+    public void testSimple() throws Exception {
+        final String backupCollectionName = getCollectionName();
+        final String restoreCollectionName = backupCollectionName + "_restore";
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupincsimple");
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
+                .process(solrClient);
+        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            long t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
+            expectedNumDocs += indexDocs(backupCollectionName, true);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            long numFound = cluster.getSolrClient().query(backupCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            log.info("Created backup with {} docs, took {}ms", numFound, timeTaken);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                    .setBackupId(0)
+                    .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).processAndWait(solrClient, 500);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Restored from backup, took {}ms", timeTaken);
+            numFound = cluster.getSolrClient().query(restoreCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            assertEquals(expectedNumDocs, numFound);
+        }
+    }
+
+    @Test
+    @Slow
+    @SuppressWarnings("rawtypes")
+    public void testBackupIncremental() throws Exception {
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupinc");
+        randomizeReplicaTypes();
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+                .process(solrClient);
+
+        indexDocs(getCollectionName(), false);
+
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
+
+            backupRestoreThenCheck(solrClient, verifier);
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            // adding more commits to trigger merging segments
+            for (int i = 0; i < 15; i++) {
+                indexDocs(getCollectionName(), 5,false);
+            }
+            backupRestoreThenCheck(solrClient, verifier);
+
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+            new UpdateRequest()
+                    .deleteByQuery("*:*")
+                    .commit(cluster.getSolrClient(), getCollectionName());
+            indexDocs(getCollectionName(), false);
+            // corrupt index files
+            corruptIndexFiles();
+            try {
+                log.info("Create backup after corrupt index files");
+                CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                        .setLocation(backupLocation)
+                        .setIncremental(true)
+                        .setMaxNumberBackupPoints(3)
+                        .setRepositoryName(BACKUP_REPO_NAME);
+                if (random().nextBoolean()) {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    if (state != RequestStatusState.FAILED) {
+                        fail("This backup should be failed");
+                    }
+                } else {
+                    CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                    fail("This backup should be failed");
+                }
+            } catch (Exception e) {
+                // expected
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected void corruptIndexFiles() throws IOException {
+        Collection<Slice> slices = getCollectionState(getCollectionName()).getSlices();
+        Slice slice = slices.iterator().next();
+        JettySolrRunner leaderNode = cluster.getReplicaJetty(slice.getLeader());
+
+        SolrCore solrCore = leaderNode.getCoreContainer().getCore(slice.getLeader().getCoreName());
+        Set<String> fileNames = new HashSet<>(solrCore.getDeletionPolicy().getLatestCommit().getFileNames());
+        File indexFolder = new File(solrCore.getIndexDir());

Review comment:
       *PATH_TRAVERSAL_IN:*  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose location might be specified by user input [(details)](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN)

##########
File path: solr/test-framework/src/java/org/apache/solr/cloud/api/collections/AbstractIncrementalBackupTest.java
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.BackupId;
+import org.apache.solr.core.backup.BackupProperties;
+import org.apache.solr.core.backup.Checksum;
+import org.apache.solr.core.backup.ShardBackupId;
+import org.apache.solr.core.backup.ShardBackupMetadata;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.BackupFilePaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+
+/**
+ * Used to test the incremental method of backup/restoration (as opposed to the deprecated 'full snapshot' method).
+ *
+ * For a similar test harness for snapshot backup/restoration see {@link AbstractCloudBackupRestoreTestCase}
+ */
+public abstract class AbstractIncrementalBackupTest extends SolrCloudTestCase {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static long docsSeed; // see indexDocs()
+    protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
+    protected static final String BACKUPNAME_PREFIX = "mytestbackup";
+    protected static final String BACKUP_REPO_NAME = "trackingBackupRepository";
+
+    protected String testSuffix = "test1";
+    protected int replFactor;
+    protected int numTlogReplicas;
+    protected int numPullReplicas;
+
+    @BeforeClass
+    public static void createCluster() throws Exception {
+        docsSeed = random().nextLong();
+        System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    }
+
+    /**
+     * @return The name of the collection to use.
+     */
+    public abstract String getCollectionNamePrefix();
+
+    public String getCollectionName(){
+        return getCollectionNamePrefix() + "_" + testSuffix;
+    }
+
+    public void setTestSuffix(String testSuffix) {
+        this.testSuffix = testSuffix;
+    }
+
+    private void randomizeReplicaTypes() {
+        replFactor = TestUtil.nextInt(random(), 1, 2);
+//    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+//    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    }
+
+    /**
+     * @return The absolute path for the backup location.
+     *         Could return null.
+     */
+    public abstract String getBackupLocation();
+
+    @Test
+    public void testSimple() throws Exception {
+        final String backupCollectionName = getCollectionName();
+        final String restoreCollectionName = backupCollectionName + "_restore";
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupincsimple");
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(backupCollectionName, "conf1", NUM_SHARDS, 1)
+                .process(solrClient);
+        int expectedNumDocs = indexDocs(backupCollectionName, true);
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            long t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            long timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Created backup with {} docs, took {}ms", expectedNumDocs, timeTaken);
+            expectedNumDocs += indexDocs(backupCollectionName, true);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.backupCollection(backupCollectionName, backupName)
+                    .setLocation(backupLocation)
+                    .setIncremental(true)
+                    .setRepositoryName(BACKUP_REPO_NAME)
+                    .processAndWait(cluster.getSolrClient(), 100);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            long numFound = cluster.getSolrClient().query(backupCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            log.info("Created backup with {} docs, took {}ms", numFound, timeTaken);
+
+            t = System.nanoTime();
+            CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                    .setBackupId(0)
+                    .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).processAndWait(solrClient, 500);
+            timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t);
+            log.info("Restored from backup, took {}ms", timeTaken);
+            numFound = cluster.getSolrClient().query(restoreCollectionName,
+                    new SolrQuery("*:*")).getResults().getNumFound();
+            assertEquals(expectedNumDocs, numFound);
+        }
+    }
+
+    @Test
+    @Slow
+    @SuppressWarnings("rawtypes")
+    public void testBackupIncremental() throws Exception {
+        TrackingBackupRepository.clear();
+
+        setTestSuffix("testbackupinc");
+        randomizeReplicaTypes();
+        CloudSolrClient solrClient = cluster.getSolrClient();
+
+        CollectionAdminRequest
+                .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+                .process(solrClient);
+
+        indexDocs(getCollectionName(), false);
+
+        String backupName = BACKUPNAME_PREFIX + testSuffix;
+        try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+                .newBackupRepository(BACKUP_REPO_NAME)) {
+            String backupLocation = repository.getBackupLocation(getBackupLocation());
+            URI uri = repository.resolve(repository.createURI(backupLocation), backupName);
+            BackupFilePaths backupPaths = new BackupFilePaths(repository, uri);
+            IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName, getCollectionName(), 3);
+
+            backupRestoreThenCheck(solrClient, verifier);
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            // adding more commits to trigger merging segments
+            for (int i = 0; i < 15; i++) {
+                indexDocs(getCollectionName(), 5,false);
+            }
+            backupRestoreThenCheck(solrClient, verifier);
+
+            indexDocs(getCollectionName(), false);
+            backupRestoreThenCheck(solrClient, verifier);
+
+            simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+            new UpdateRequest()
+                    .deleteByQuery("*:*")
+                    .commit(cluster.getSolrClient(), getCollectionName());
+            indexDocs(getCollectionName(), false);
+            // corrupt index files
+            corruptIndexFiles();
+            try {
+                log.info("Create backup after corrupt index files");
+                CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+                        .setLocation(backupLocation)
+                        .setIncremental(true)
+                        .setMaxNumberBackupPoints(3)
+                        .setRepositoryName(BACKUP_REPO_NAME);
+                if (random().nextBoolean()) {
+                    RequestStatusState state = backup.processAndWait(cluster.getSolrClient(), 1000);
+                    if (state != RequestStatusState.FAILED) {
+                        fail("This backup should be failed");
+                    }
+                } else {
+                    CollectionAdminResponse rsp = backup.process(cluster.getSolrClient());
+                    fail("This backup should be failed");
+                }
+            } catch (Exception e) {
+                // expected
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected void corruptIndexFiles() throws IOException {
+        Collection<Slice> slices = getCollectionState(getCollectionName()).getSlices();
+        Slice slice = slices.iterator().next();
+        JettySolrRunner leaderNode = cluster.getReplicaJetty(slice.getLeader());
+
+        SolrCore solrCore = leaderNode.getCoreContainer().getCore(slice.getLeader().getCoreName());
+        Set<String> fileNames = new HashSet<>(solrCore.getDeletionPolicy().getLatestCommit().getFileNames());
+        File indexFolder = new File(solrCore.getIndexDir());
+        File fileGetCorrupted = Stream.of(Objects.requireNonNull(indexFolder.listFiles()))
+                .filter(x -> fileNames.contains(x.getName()))
+                .findAny().get();
+        try (FileInputStream fis = new FileInputStream(fileGetCorrupted)){
+            byte[] contents = IOUtils.readFully(fis, (int) fileGetCorrupted.length());
+            contents[contents.length - CodecUtil.footerLength() - 1] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 2] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 3] += 1;
+            contents[contents.length - CodecUtil.footerLength() - 4] += 1;
+            try (FileOutputStream fos = new FileOutputStream(fileGetCorrupted)) {
+                IOUtils.write(contents, fos);
+            }
+        } finally {
+            solrCore.close();
+        }
+    }
+
+    private void backupRestoreThenCheck(CloudSolrClient solrClient,
+                                        IncrementalBackupVerifier verifier) throws Exception {
+        verifier.incrementalBackupThenVerify();
+
+        if( random().nextBoolean() )
+            simpleRestoreAndCheckDocCount(solrClient, verifier.backupLocation, verifier.backupName);
+    }
+
+    private void simpleRestoreAndCheckDocCount(CloudSolrClient solrClient, String backupLocation, String backupName) throws Exception{
+        Map<String, Integer> origShardToDocCount = AbstractCloudBackupRestoreTestCase.getShardToDocCountMap(solrClient, getCollectionState(getCollectionName()));
+
+        String restoreCollectionName = getCollectionName() + "_restored";
+
+        CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+                .setLocation(backupLocation).setRepositoryName(BACKUP_REPO_NAME).process(solrClient);
+
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+                restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
+
+        // check num docs are the same
+        assertEquals(origShardToDocCount, AbstractCloudBackupRestoreTestCase.getShardToDocCountMap(solrClient, getCollectionState(restoreCollectionName)));
+
+        // this methods may get invoked multiple times, collection must be cleanup
+        CollectionAdminRequest.deleteCollection(restoreCollectionName).process(solrClient);
+    }
+
+
+    private void indexDocs(String collectionName, int numDocs, boolean useUUID) throws Exception {
+        Random random = new Random(docsSeed);

Review comment:
       *PREDICTABLE_RANDOM:*  This random generator (java.util.Random) is predictable [(details)](https://find-sec-bugs.github.io/bugs.htm#PREDICTABLE_RANDOM)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org