You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2021/02/12 20:33:39 UTC

[lucene-solr] branch branch_8_8 updated: SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (closes #2359 and #2318)

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch branch_8_8
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8_8 by this push:
     new 22c716b  SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (closes #2359 and #2318)
22c716b is described below

commit 22c716bcd946fa2d49e6cea53c0f0dd689954d76
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Sat Feb 13 01:57:39 2021 +0530

    SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (closes #2359 and #2318)
---
 solr/CHANGES.txt                                   |  3 +
 .../src/java/org/apache/solr/cloud/Overseer.java   | 24 +++++-
 .../solr/cloud/RefreshCollectionMessage.java       | 51 ++++++++++++
 .../cloud/api/collections/CreateCollectionCmd.java | 97 +++++++++++++++++-----
 .../apache/solr/common/cloud/PerReplicaStates.java | 14 ++++
 .../solr/common/cloud/PerReplicaStatesOps.java     | 18 ++--
 .../client/solrj/impl/CloudSolrClientTest.java     |  8 ++
 7 files changed, 185 insertions(+), 30 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 1fe80b6..7831efd 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,6 +17,9 @@ Bug Fixes
 
 * SOLR-15136: Reduce excessive logging introduced with Per Replica States feature (Ishan Chattopadhyaya)
 
+* SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections
+  (Mike Drob, Ilan Ginzburg, noble, Ishan Chattopadhyaya)
+
 ==================  8.8.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3f52f74..4ca5703 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -95,6 +96,7 @@ public class Overseer implements SolrCloseable {
 
   public static final int NUM_RESPONSES_TO_STORE = 10000;
   public static final String OVERSEER_ELECT = "/overseer_elect";
+  private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
 
   private SolrMetricsContext solrMetricsContext;
   private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@@ -198,8 +200,6 @@ public class Overseer implements SolrCloseable {
                 if (log.isDebugEnabled()) {
                   log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getZkStats().getQueueLength(), message);
                 }
-                // force flush to ZK after each message because there is no fallback if workQueue items
-                // are removed from workQueue but fail to be written to ZK
                 try {
                   clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
                 } catch (Exception e) {
@@ -260,6 +260,13 @@ public class Overseer implements SolrCloseable {
 
                 processedNodes.add(head.first());
                 fallbackQueueSize = processedNodes.size();
+                // force flush to ZK after each message because there is no fallback if workQueue items
+                // are removed from workQueue but fail to be written to ZK
+                while (unprocessedMessages.size() > 0) {
+                  clusterState = zkStateWriter.writePendingUpdates();
+                  Message m = unprocessedMessages.remove(0);
+                  clusterState = m.run(clusterState, Overseer.this);
+                }
                 // The callback always be called on this thread
                 clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
                   stateUpdateQueue.remove(processedNodes);
@@ -1018,4 +1025,17 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data);
   }
 
+  /**
+   * Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
+   * loop runs next time
+   */
+  public void submit(Message message) {
+    unprocessedMessages.add(message);
+  }
+
+  public interface Message {
+    ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+
+  }
+
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
new file mode 100644
index 0000000..2f221af
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Refresh the Cluster State for a given collection
+ *
+ */
+public class RefreshCollectionMessage implements Overseer.Message {
+  public final String collection;
+
+  public RefreshCollectionMessage(String collection) {
+    this.collection = collection;
+  }
+
+  public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+    Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
+    if (stat == null) {
+      //collection does not exist
+      return clusterState.copyWith(collection, null);
+    }
+    DocCollection coll = clusterState.getCollectionOrNull(collection);
+    if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
+      //our state is up to date
+      return clusterState;
+    } else {
+      coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+      return clusterState.copyWith(collection, coll);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a9299cc..6d9e67e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -41,9 +42,12 @@ import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.RefreshCollectionMessage;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
@@ -51,6 +55,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.ZkConfigManager;
@@ -114,6 +119,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
     final String alias = message.getStr(ALIAS, collectionName);
     log.info("Create collection {}", collectionName);
+    final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
     if (clusterState.hasCollection(collectionName)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
     }
@@ -148,6 +154,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     // fail fast if parameters are wrong or incomplete
     List<String> shardNames = populateShardNames(message, router);
     checkReplicaTypes(message);
+    DocCollection newColl = null;
+    final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
 
     AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 
@@ -170,27 +178,41 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
-      
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-
-      // wait for a while until we see the collection
-      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-      boolean created = false;
-      while (! waitUntil.hasTimedOut()) {
-        waitUntil.sleep(100);
-        created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
-        if (created) break;
-      }
-      if (!created) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+      if (isPRS) {
+        // In case of a PRS collection, create the collection structure directly instead of resubmitting
+        // to the overseer queue.
+        // TODO: Consider doing this for all collections, not just the PRS collections.
+        ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
+        byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+        ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
+        clusterState = clusterState.copyWith(collectionName, command.collection);
+        newColl = command.collection;
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+
+        // wait for a while until we see the collection
+        TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+        boolean created = false;
+        while (!waitUntil.hasTimedOut()) {
+          waitUntil.sleep(100);
+          created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+          if (created) break;
+        }
+        if (!created) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+        }
+
+        // refresh cluster state
+        clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+        newColl = clusterState.getCollection(collectionName);
+
       }
 
-      // refresh cluster state
-      clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
 
       List<ReplicaPosition> replicaPositions = null;
       try {
-        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames, sessionWrapper);
+        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, newColl, message, shardNames, sessionWrapper);
       } catch (Assign.AssignmentException e) {
         ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
         new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -230,7 +252,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         }
 
         String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
-            ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
+            newColl,
             replicaPosition.shard, replicaPosition.type, true);
         if (log.isDebugEnabled()) {
           log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
@@ -250,7 +272,18 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               ZkStateReader.NODE_NAME_PROP, nodeName,
               ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
               CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          if (isPRS) {
+            // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
+            // to the overseer queue.
+            // TODO: Consider doing this for all collections, not just the PRS collections.
+            ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
+            byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+            ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
+            clusterState = clusterState.copyWith(collectionName, command.collection);
+            newColl = command.collection;
+          } else {
+            ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          }
         }
 
         // Need to create new params for each request
@@ -289,7 +322,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       if(!isLegacyCloud) {
         // wait for all replica entries to be created
-        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        Map<String, Replica> replicas ;
+        if (isPRS) {
+          replicas = new ConcurrentHashMap<>();
+          newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
+              .filter(r -> coresToCreate.containsKey(r.getCoreName()))       // Only the elements that were asked for...
+              .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
+        } else {
+          replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        }
+
         for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
           ShardRequest sreq = e.getValue();
           sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
@@ -300,6 +342,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
       @SuppressWarnings({"rawtypes"})
       boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
+      if (isPRS) {
+        TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
+        PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        while (!timeout.hasTimedOut()) {
+          if(prs.allActive()) break;
+          Thread.sleep(100);
+          prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        }
+        if (prs.allActive()) {
+          // we have successfully found all replicas to be ACTIVE
+        } else {
+          failure = true;
+        }
+        // Now ask Overseer to fetch the latest state of collection
+        // from ZK
+        ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+      }
       if (failure) {
         // Let's cleanup as we hit an exception
         // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index af4bb43..975f654 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 import org.apache.solr.cluster.api.SimpleMap;
@@ -67,6 +68,8 @@ public class PerReplicaStates implements ReflectMapWriter {
   @JsonProperty
   public final SimpleMap<State> states;
 
+  private Boolean allActive;
+
   /**
    * Construct with data read from ZK
    * @param path path from where this is loaded
@@ -92,6 +95,17 @@ public class PerReplicaStates implements ReflectMapWriter {
 
   }
 
+  /** Check and return if all replicas are ACTIVE
+   */
+  public boolean allActive() {
+    if (this.allActive != null) return allActive;
+    AtomicBoolean result = new AtomicBoolean(true);
+    states.forEachEntry((r, s) -> {
+      if (s.state != Replica.State.ACTIVE) result.set(false);
+    });
+    return this.allActive = result.get();
+  }
+
   /**Get the changed replicas
    */
   public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 1bf0ecc..3d3a184 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -117,7 +117,7 @@ public class PerReplicaStatesOps {
   public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
-      PerReplicaStates.State existing = rs.get(replica);
+      PerReplicaStates.State existing = prs.get(replica);
       if (existing == null) {
         operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
       } else {
@@ -125,7 +125,7 @@ public class PerReplicaStatesOps {
         addDeleteStaleNodes(operations, existing);
       }
       if (log.isDebugEnabled()) {
-        log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+        log.debug("flipState on {}, {} -> {}, ops :{}", prs.path, replica, newState, operations);
       }
       return operations;
     }).init(rs);
@@ -161,7 +161,7 @@ public class PerReplicaStatesOps {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> ops = new ArrayList<>();
       if (next != null) {
-        PerReplicaStates.State st = rs.get(next);
+        PerReplicaStates.State st = prs.get(next);
         if (st != null) {
           if (!st.isLeader) {
             ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
@@ -177,7 +177,7 @@ public class PerReplicaStatesOps {
 
       // now go through all other replicas and unset previous leader
       for (String r : allReplicas) {
-        PerReplicaStates.State st = rs.get(r);
+        PerReplicaStates.State st = prs.get(r);
         if (st == null) continue;//unlikely
         if (!Objects.equals(r, next)) {
           if (st.isLeader) {
@@ -188,7 +188,7 @@ public class PerReplicaStatesOps {
         }
       }
       if (log.isDebugEnabled()) {
-        log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+        log.debug("flipLeader on:{}, {} -> {}, ops: {}", prs.path, allReplicas, next, ops);
       }
       return ops;
     }).init(rs);
@@ -202,10 +202,10 @@ public class PerReplicaStatesOps {
   public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> result;
-      if (rs == null) {
+      if (prs == null) {
         result = Collections.emptyList();
       } else {
-        PerReplicaStates.State state = rs.get(replica);
+        PerReplicaStates.State state = prs.get(replica);
         result = addDeleteStaleNodes(new ArrayList<>(), state);
       }
       return result;
@@ -224,7 +224,7 @@ public class PerReplicaStatesOps {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> operations = new ArrayList<>();
       for (String replica : replicas) {
-        PerReplicaStates.State r = rs.get(replica);
+        PerReplicaStates.State r = prs.get(replica);
         if (r != null) {
           if (r.state == Replica.State.DOWN && !r.isLeader) continue;
           operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
@@ -234,7 +234,7 @@ public class PerReplicaStatesOps {
         }
       }
       if (log.isDebugEnabled()) {
-        log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+        log.debug("for coll: {} down replicas {}, ops {}", prs, replicas, operations);
       }
       return operations;
     }).init(rs);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 171785c..b85499d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -1099,6 +1099,14 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
     PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
     assertEquals(4, prs.states.size());
+
+    // Now let's do an add replica
+    CollectionAdminRequest
+        .addReplicaToShard(testCollection, "shard1")
+        .process(cluster.getSolrClient());
+    prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
+    assertEquals(5, prs.states.size());
+
     testCollection = "perReplicaState_testv2";
     new V2Request.Builder("/collections")
         .withMethod(POST)