You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/01/03 10:52:32 UTC

[solr] 01/01: All changes from github.com/apache/solr/pull/1242

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/SOLR-16580_9_1
in repository https://gitbox.apache.org/repos/asf/solr.git

commit ce9101db9ff032ccb724edfd68f490badb61158e
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Jan 3 21:51:51 2023 +1100

    All changes from github.com/apache/solr/pull/1242
---
 .../solr/cloud/DistributedClusterStateUpdater.java |  5 +-
 .../solr/cloud/overseer/ClusterStateMutator.java   |  4 +-
 .../solr/cloud/overseer/CollectionMutator.java     |  7 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 15 +++-
 .../org/apache/solr/core/backup/BackupManager.java |  2 +-
 .../apache/solr/handler/admin/ClusterStatus.java   |  5 ++
 .../solr/handler/admin/CollectionsHandler.java     |  6 +-
 .../org/apache/solr/cloud/ClusterStateTest.java    | 10 +--
 .../OverseerCollectionConfigSetProcessorTest.java  |  8 +-
 .../test/org/apache/solr/cloud/SliceStateTest.java |  2 +-
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 70 ++++++++++++----
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 11 ++-
 .../client/solrj/cloud/DistribStateManager.java    | 12 +++
 .../solrj/impl/ZkClientClusterStateProvider.java   |  8 +-
 .../solr/common/cloud/PerReplicaStatesFetcher.java | 10 +++
 .../apache/solr/common/cloud/ZkStateReader.java    | 17 ----
 .../solrj/impl/BaseHttpClusterStateProvider.java   | 27 +++++-
 .../org/apache/solr/common/cloud/ClusterState.java | 92 +++++---------------
 .../apache/solr/common/cloud/DocCollection.java    | 97 ++++++++++++++--------
 .../apache/solr/common/cloud/PerReplicaStates.java | 20 +++++
 .../java/org/apache/solr/common/cloud/Replica.java | 49 ++++++-----
 .../java/org/apache/solr/common/cloud/Slice.java   | 26 +++++-
 .../solrj/impl/CloudSolrClientCacheTest.java       |  2 +-
 .../cloud/PerReplicaStatesIntegrationTest.java     |  8 +-
 24 files changed, 326 insertions(+), 187 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index ca745b55ff6..2ee7cba3239 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -628,13 +628,16 @@ public class DistributedClusterStateUpdater {
       // This factory method can detect a missing configName and supply it by reading it from the
       // old ZK location.
       // TODO in Solr 10 remove that factory method
-      ClusterState clusterState =
+
+      ClusterState clusterState;
+      clusterState =
           ZkClientClusterStateProvider.createFromJsonSupportingLegacyConfigName(
               stat.getVersion(),
               data,
               Collections.emptySet(),
               updater.getCollectionName(),
               zkStateReader.getZkClient());
+
       return clusterState;
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index f979d47a9af..f95959e33e5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -125,7 +125,9 @@ public class ClusterStateMutator {
     }
 
     assert !collectionProps.containsKey(CollectionAdminParams.COLL_CONF);
-    DocCollection newCollection = new DocCollection(cName, slices, collectionProps, router, -1);
+    DocCollection newCollection =
+        new DocCollection(
+            cName, slices, collectionProps, router, -1, stateManager.getPrsSupplier(cName));
 
     return new ZkWriteCommand(cName, newCollection);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 6269e7ba329..492fe9efeb5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -172,7 +172,12 @@ public class CollectionMutator {
 
     DocCollection collection =
         new DocCollection(
-            coll.getName(), coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion());
+            coll.getName(),
+            coll.getSlicesMap(),
+            props,
+            coll.getRouter(),
+            coll.getZNodeVersion(),
+            stateManager.getPrsSupplier(coll.getName()));
     if (replicaOps == null) {
       return new ZkWriteCommand(coll.getName(), collection);
     } else {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 8fb0ce287fc..6aa561632cd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -280,13 +280,24 @@ public class ZkStateWriter {
               Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
               DocCollection newCollection =
                   new DocCollection(
-                      name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion());
+                      name,
+                      c.getSlicesMap(),
+                      c.getProperties(),
+                      c.getRouter(),
+                      stat.getVersion(),
+                      new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             } else {
               log.debug("going to create_collection {}", path);
               reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
               DocCollection newCollection =
-                  new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0);
+                  new DocCollection(
+                      name,
+                      c.getSlicesMap(),
+                      c.getProperties(),
+                      c.getRouter(),
+                      0,
+                      new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index 647886bca6a..660ea80cbaa 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -219,7 +219,7 @@ public class BackupManager {
         repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
-      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet());
+      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet(), null);
       return c_state.getCollection(collectionName);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index c1a7c31a02d..9130fcc317a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -188,6 +189,10 @@ public class ClusterStatus {
       }
       String configName = clusterStateCollection.getConfigName();
       collectionStatus.put("configName", configName);
+      if (message.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
+        PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
+        collectionStatus.put("PRS", prs);
+      }
       collectionProps.add(name, collectionStatus);
     }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 801b56d743c..cdaf70fb453 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1237,7 +1237,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         CLUSTERSTATUS,
         (req, rsp, h) -> {
           Map<String, Object> all =
-              copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_);
+              copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_, "prs");
           new ClusterStatus(
                   h.coreContainer.getZkController().getZkStateReader(), new ZkNodeProps(all))
               .getClusterStatus(rsp.getValues());
@@ -2010,7 +2010,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
                       replicas.add(shard.getLeader());
                     }
                     for (Replica replica : replicas) {
-                      String state = replica.getStr(ZkStateReader.STATE_PROP);
+                      State state = replica.getState();
                       if (log.isDebugEnabled()) {
                         log.debug(
                             "Checking replica status, collection={} replica={} state={}",
@@ -2019,7 +2019,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
                             state);
                       }
                       if (!n.contains(replica.getNodeName())
-                          || !state.equals(Replica.State.ACTIVE.toString())) {
+                          || !state.equals(Replica.State.ACTIVE)) {
                         replicaNotAliveCnt++;
                         return false;
                       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index 1af173271e0..7acb960828a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -59,14 +59,14 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     Slice slice2 = new Slice("shard2", sliceToProps, null, "collection1");
     slices.put("shard2", slice2);
     collectionStates.put(
-        "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT));
+        "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT, 0, null));
     collectionStates.put(
-        "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT));
+        "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT, 0, null));
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
     // System.out.println("#################### " + new String(bytes));
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
@@ -90,13 +90,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
             .get("node1")
             .getStr("prop2"));
 
-    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes, null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
 
-    loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes, null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index f90e3805405..c2fcc138c2c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -661,6 +661,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     handleCreateCollMessageProps(ZkNodeProps.load(bytes));
   }
 
+  @SuppressWarnings("DirectInvocationOnMock")
   private void handleCreateCollMessageProps(ZkNodeProps props) {
     log.info("track created replicas / collections");
     try {
@@ -676,7 +677,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
               collName,
               new ClusterState.CollectionRef(
                   new DocCollection(
-                      collName, new HashMap<>(), props.getProperties(), DocRouter.DEFAULT)));
+                      collName,
+                      new HashMap<>(),
+                      props.getProperties(),
+                      DocRouter.DEFAULT,
+                      0,
+                      distribStateManagerMock.getPrsSupplier(collName))));
       }
       if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) {
         replicas.add(props);
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index f12b9597ccb..4517cc1bc15 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -59,7 +59,7 @@ public class SliceStateTest extends SolrTestCaseJ4 {
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
 
     assertSame(
         "Default state not set to active",
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 73093e41f3f..a839f3e6854 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -51,12 +51,15 @@ import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
+import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@LogLevel(
+    "org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG")
 public class ZkStateReaderTest extends SolrTestCaseJ4 {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final long TIMEOUT = 30;
@@ -132,7 +135,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
                 new HashMap<>(),
                 Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                 DocRouter.DEFAULT,
-                0));
+                0,
+                new PerReplicaStatesFetcher.LazyPrsSupplier(
+                    fixture.zkClient, DocCollection.getCollectionPath("c1"))));
 
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
     writer.writePendingUpdates();
@@ -157,7 +162,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -168,7 +175,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     Map<String, Object> props = new HashMap<>();
     props.put("x", "y");
     props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
-    state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
+    state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            props,
+            DocRouter.DEFAULT,
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -207,7 +222,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -239,7 +256,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
     clusterState = writer.writePendingUpdates();
@@ -256,7 +275,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
     assertFalse(ref.isLazilyLoaded());
     assertEquals(0, ref.get().getZNodeVersion());
-    assertEquals(-1, ref.get().getChildNodesVersion());
+    // dummy node created +1 and deleted +1 so 2
+    assertEquals(2, ref.get().getChildNodesVersion());
 
     DocCollection collection = ref.get();
     PerReplicaStates prs =
@@ -275,7 +295,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
     ref = reader.getClusterState().getCollectionRef("c1");
     assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
-    assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now
+    assertEquals(3, ref.get().getChildNodesVersion()); // but child version should be 1 now
 
     prs = ref.get().getPerReplicaStates();
     PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
@@ -289,7 +309,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     ref = reader.getClusterState().getCollectionRef("c1");
     assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
     // but child version should be 3 now (1 del + 1 add)
-    assertEquals(3, ref.get().getChildNodesVersion());
+    assertEquals(5, ref.get().getChildNodesVersion());
 
     // now delete the collection
     wc = new ZkWriteCommand("c1", null);
@@ -314,7 +334,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     ref = reader.getClusterState().getCollectionRef("c1");
     assertFalse(ref.isLazilyLoaded());
     assertEquals(0, ref.get().getZNodeVersion());
-    assertEquals(-1, ref.get().getChildNodesVersion()); // child node version is reset
+    assertEquals(2, ref.get().getChildNodesVersion()); // child node version is reset
 
     // re-add PRS
     collection = ref.get();
@@ -335,7 +355,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     ref = reader.getClusterState().getCollectionRef("c1");
 
     // child version should be reset since the state.json node was deleted and re-created
-    assertEquals(1, ref.get().getChildNodesVersion());
+    assertEquals(3, ref.get().getChildNodesVersion());
   }
 
   public void testForciblyRefreshAllClusterState() throws Exception {
@@ -354,9 +374,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
         new DocCollection(
             "c1",
             new HashMap<>(),
-            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            Map.of(
+                ZkStateReader.CONFIGNAME_PROP,
+                ConfigSetsHandler.DEFAULT_CONFIGSET_NAME,
+                DocCollection.CollectionStateProps.PER_REPLICA_STATE,
+                "true"),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -376,7 +402,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            ref.get().getZNodeVersion());
+            ref.get().getZNodeVersion(),
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -397,7 +425,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c2")));
     ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
 
     writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
@@ -432,7 +462,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
     DocCollection state2 =
         new DocCollection(
@@ -440,7 +472,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            new PerReplicaStatesFetcher.LazyPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath("c1")));
 
     // do not listen to c2
     fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
@@ -493,7 +527,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
                             ZkStateReader.CONFIGNAME_PROP,
                             ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                         DocRouter.DEFAULT,
-                        currentVersion);
+                        currentVersion,
+                        new PerReplicaStatesFetcher.LazyPrsSupplier(
+                            fixture.zkClient, DocCollection.getCollectionPath("c1")));
                 ZkWriteCommand wc = new ZkWriteCommand("c1", state);
                 writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
                 clusterState = writer.writePendingUpdates();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index be38ec88fe6..27ba3dda4b8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -32,6 +32,7 @@ import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -161,7 +162,15 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         prsProps.put("perReplicaState", Boolean.TRUE);
         ZkWriteCommand prs1 =
             new ZkWriteCommand(
-                "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+                "prs1",
+                new DocCollection(
+                    "prs1",
+                    new HashMap<>(),
+                    prsProps,
+                    DocRouter.DEFAULT,
+                    0,
+                    new PerReplicaStatesFetcher.LazyPrsSupplier(
+                        zkClient, DocCollection.getCollectionPath("c1"))));
         ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
         // First write is flushed immediately
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
index 09be10c73c9..7f2189a63f1 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -123,6 +124,17 @@ public interface DistribStateManager extends SolrCloseable {
     throw new UnsupportedOperationException("Not implemented");
   }
 
+  default DocCollection.PrsSupplier getPrsSupplier(String collName) {
+    return new DocCollection.PrsSupplier(
+        () -> {
+          try {
+            return getReplicaStates(DocCollection.getCollectionPath(collName));
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
   /**
    * Remove data recursively.
    *
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index cfb0911a142..98c89b1ee0d 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -28,6 +28,7 @@ import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -113,7 +114,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
         }
       }
     }
-    return ClusterState.createFromCollectionMap(version, stateMap, liveNodes);
+    return ClusterState.createFromCollectionMap(
+        version,
+        stateMap,
+        liveNodes,
+        new PerReplicaStatesFetcher.LazyPrsSupplier(
+            zkClient, DocCollection.getCollectionPath(coll)));
   }
 
   @Override
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
index f959cab3e03..c8baa8b4f07 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
@@ -17,13 +17,17 @@
 
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.List;
 import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PerReplicaStatesFetcher {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   /**
    * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link
    * Stat#getCversion()} of state.json. If this is not modified, the same object is returned
@@ -50,4 +54,10 @@ public class PerReplicaStatesFetcher {
           e);
     }
   }
+
+  public static class LazyPrsSupplier extends DocCollection.PrsSupplier {
+    public LazyPrsSupplier(SolrZkClient zkClient, String collectionPath) {
+      super(() -> PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null));
+    }
+  }
 }
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 26239784115..d5f69e8a219 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1684,21 +1684,6 @@ public class ZkStateReader implements SolrCloseable {
       throws KeeperException, InterruptedException {
     String collectionPath = DocCollection.getCollectionPath(coll);
     while (true) {
-      ClusterState.initReplicaStateProvider(
-          () -> {
-            try {
-              PerReplicaStates replicaStates =
-                  PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
-              log.debug(
-                  "per-replica-state ver: {} fetched for initializing {} ",
-                  replicaStates.cversion,
-                  collectionPath);
-              return replicaStates;
-            } catch (Exception e) {
-              // TODO
-              throw new RuntimeException(e);
-            }
-          });
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1723,8 +1708,6 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         return null;
-      } finally {
-        ClusterState.clearReplicaStateProvider();
       }
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index 875f21a8ade..a2420ec37b2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -35,6 +35,8 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
@@ -123,6 +125,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
       params.set("collection", collection);
     }
     params.set("action", "CLUSTERSTATUS");
+    params.set("prs", "true");
     QueryRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
     SimpleOrderedMap<?> cluster = (SimpleOrderedMap<?>) client.request(request).get("cluster");
@@ -147,7 +150,13 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
     Set<String> liveNodes = new HashSet<>((List<String>) (cluster.get("live_nodes")));
     this.liveNodes = liveNodes;
     liveNodesTimestamp = System.nanoTime();
-    ClusterState cs = ClusterState.createFromCollectionMap(znodeVersion, collectionsMap, liveNodes);
+    ClusterState cs = new ClusterState(liveNodes, new HashMap<>());
+    for (Map.Entry<String, Object> e : collectionsMap.entrySet()) {
+      @SuppressWarnings("rawtypes")
+      Map m = (Map) e.getValue();
+      cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, m));
+    }
+
     if (clusterProperties != null) {
       Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
       if (properties != null) {
@@ -157,6 +166,22 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
     return cs;
   }
 
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private DocCollection fillPrs(int znodeVersion, Map.Entry<String, Object> e, Map m) {
+    DocCollection.PrsSupplier prsSupplier = null;
+    if (m.containsKey("PRS")) {
+      Map prs = (Map) m.remove("PRS");
+      prsSupplier =
+          new DocCollection.PrsSupplier(
+              () ->
+                  new PerReplicaStates(
+                      (String) prs.get("path"),
+                      (Integer) prs.get("cversion"),
+                      (List<String>) prs.get("states")));
+    }
+    return ClusterState.collectionFromObjects(e.getKey(), m, znodeVersion, prsSupplier);
+  }
+
   @Override
   public Set<String> getLiveNodes() {
     if (liveNodes == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 3a96b8b8dcd..67adc9667dd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -26,12 +26,10 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -227,33 +225,49 @@ public class ClusterState implements JSONWriter.Writable {
    * @param liveNodes list of live nodes
    * @return the ClusterState
    */
-  public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
+  public static ClusterState createFromJson(
+      int version, byte[] bytes, Set<String> liveNodes, DocCollection.PrsSupplier prsSupplier) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> stateMap =
         (Map<String, Object>) Utils.fromJSON(bytes, 0, bytes.length, STR_INTERNER_OBJ_BUILDER);
-    return createFromCollectionMap(version, stateMap, liveNodes);
+    return createFromCollectionMap(version, stateMap, liveNodes, prsSupplier);
+  }
+
+  @Deprecated
+  public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
+    return createFromJson(version, bytes, liveNodes, null);
   }
 
   public static ClusterState createFromCollectionMap(
-      int version, Map<String, Object> stateMap, Set<String> liveNodes) {
+      int version,
+      Map<String, Object> stateMap,
+      Set<String> liveNodes,
+      DocCollection.PrsSupplier prsSupplier) {
     Map<String, CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
       String collectionName = entry.getKey();
       @SuppressWarnings({"unchecked"})
       DocCollection coll =
-          collectionFromObjects(collectionName, (Map<String, Object>) entry.getValue(), version);
+          collectionFromObjects(
+              collectionName, (Map<String, Object>) entry.getValue(), version, prsSupplier);
       collections.put(collectionName, new CollectionRef(coll));
     }
 
     return new ClusterState(collections, liveNodes);
   }
 
+  @Deprecated
+  public static ClusterState createFromCollectionMap(
+      int version, Map<String, Object> stateMap, Set<String> liveNodes) {
+    return createFromCollectionMap(version, stateMap, liveNodes, null);
+  }
+
   // TODO move to static DocCollection.loadFromMap
-  private static DocCollection collectionFromObjects(
-      String name, Map<String, Object> objs, int version) {
+  public static DocCollection collectionFromObjects(
+      String name, Map<String, Object> objs, int version, DocCollection.PrsSupplier prsSupplier) {
     Map<String, Object> props;
     Map<String, Slice> slices;
 
@@ -261,9 +275,6 @@ public class ClusterState implements JSONWriter.Writable {
       if (log.isDebugEnabled()) {
         log.debug("a collection {} has per-replica state", name);
       }
-      // this collection has replica states stored outside
-      ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
-      if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
     }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(CollectionStateProps.SHARDS);
@@ -291,7 +302,7 @@ public class ClusterState implements JSONWriter.Writable {
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return new DocCollection(name, slices, props, router, version);
+    return new DocCollection(name, slices, props, router, version, prsSupplier);
   }
 
   @Override
@@ -427,63 +438,6 @@ public class ClusterState implements JSONWriter.Writable {
     return collectionStates.size();
   }
 
-  interface ReplicaStatesProvider {
-
-    Optional<ReplicaStatesProvider> get();
-
-    PerReplicaStates getStates();
-  }
-
-  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER =
-      new ReplicaStatesProvider() {
-        @Override
-        public Optional<ReplicaStatesProvider> get() {
-          return Optional.empty();
-        }
-
-        @Override
-        public PerReplicaStates getStates() {
-          throw new RuntimeException("Invalid operation");
-        }
-      };
-
-  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
-
-  public static ReplicaStatesProvider getReplicaStatesProvider() {
-    return (REPLICASTATES_PROVIDER.get() == null)
-        ? EMPTYSTATEPROVIDER
-        : REPLICASTATES_PROVIDER.get();
-  }
-
-  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
-    REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
-  }
-
-  public static void clearReplicaStateProvider() {
-    REPLICASTATES_PROVIDER.remove();
-  }
-
-  private static class StatesProvider implements ReplicaStatesProvider {
-    private final Supplier<PerReplicaStates> replicaStatesSupplier;
-    private PerReplicaStates perReplicaStates;
-    private boolean isPerReplicaState = false;
-
-    public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
-      this.replicaStatesSupplier = replicaStatesSupplier;
-    }
-
-    @Override
-    public Optional<ReplicaStatesProvider> get() {
-      return isPerReplicaState ? Optional.of(this) : Optional.empty();
-    }
-
-    @Override
-    public PerReplicaStates getStates() {
-      if (perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
-      return perReplicaStates;
-    }
-  }
-
   private static volatile Function<JSONParser, ObjectBuilder> STR_INTERNER_OBJ_BUILDER =
       STANDARDOBJBUILDER;
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 6d4f6394d45..4de7503b403 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
+import java.util.function.Supplier;
 import org.apache.solr.common.cloud.Replica.ReplicaStateProps;
 import org.noggit.JSONWriter;
 import org.slf4j.Logger;
@@ -63,13 +64,23 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Boolean readOnly;
   private final Boolean perReplicaState;
   private final Map<String, Replica> replicaMap = new HashMap<>();
-  private volatile PerReplicaStates perReplicaStates;
+  private PrsSupplier prsSupplier;
 
+  @Deprecated
   public DocCollection(
       String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, Integer.MAX_VALUE);
+    this(name, slices, props, router, Integer.MAX_VALUE, null);
   }
 
+  @Deprecated
+  public DocCollection(
+      String name,
+      Map<String, Slice> slices,
+      Map<String, Object> props,
+      DocRouter router,
+      int zkVersion) {
+    this(name, slices, props, router, zkVersion, null);
+  }
   /**
    * @param name The name of the collection
    * @param slices The logical shards of the collection. This is used directly and a copy is not
@@ -83,7 +94,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       Map<String, Slice> slices,
       Map<String, Object> props,
       DocRouter router,
-      int zkVersion) {
+      int zkVersion,
+      PrsSupplier prsSupplier) {
     super(props);
     // -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental
     // overwrites
@@ -100,9 +112,17 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.numPullReplicas = (Integer) verifyProp(props, CollectionStateProps.PULL_REPLICAS, 0);
     this.perReplicaState =
         (Boolean) verifyProp(props, CollectionStateProps.PER_REPLICA_STATE, Boolean.FALSE);
-    ClusterState.getReplicaStatesProvider()
-        .get()
-        .ifPresent(it -> perReplicaStates = it.getStates());
+    if (this.perReplicaState) {
+      if (prsSupplier == null) {
+        throw new RuntimeException(
+            CollectionStateProps.PER_REPLICA_STATE
+                + " = true , but per-replica state supplier is not provided");
+      }
+      this.prsSupplier = prsSupplier;
+      for (Slice s : this.slices.values()) {
+        s.setPrsSupplier(prsSupplier);
+      }
+    }
     Boolean readOnly = (Boolean) verifyProp(props, CollectionStateProps.READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
 
@@ -139,30 +159,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * only a replica is updated
    */
   public DocCollection copyWith(PerReplicaStates newPerReplicaStates) {
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "collection :{} going to be updated :  per-replica state :{} -> {}",
-          name,
-          getChildNodesVersion(),
-          newPerReplicaStates.cversion);
+    if (this.prsSupplier != null) {
+      log.info("In-place update of PRS: {}", newPerReplicaStates);
+      this.prsSupplier.prs = newPerReplicaStates;
     }
-    if (getChildNodesVersion() >= newPerReplicaStates.cversion) return this;
-    Set<String> modifiedReplicas =
-        PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
-    if (modifiedReplicas.isEmpty()) return this; // nothing is modified
-    Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
-    for (String s : modifiedReplicas) {
-      Replica replica = getReplica(s);
-      if (replica != null) {
-        Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
-        Slice shard = modifiedShards.get(replica.shard);
-        modifiedShards.put(replica.shard, shard.copyWith(newReplica));
-      }
-    }
-    DocCollection result =
-        new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion);
-    result.perReplicaStates = newPerReplicaStates;
-    return result;
+    return this;
   }
 
   private void addNodeNameReplica(Replica replica) {
@@ -213,8 +214,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @return the resulting DocCollection
    */
   public DocCollection copyWithSlices(Map<String, Slice> slices) {
-    DocCollection result = new DocCollection(getName(), slices, propMap, router, znodeVersion);
-    result.perReplicaStates = perReplicaStates;
+    DocCollection result =
+        new DocCollection(getName(), slices, propMap, router, znodeVersion, prsSupplier);
     return result;
   }
   /** Return collection name. */
@@ -281,7 +282,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   }
 
   public int getChildNodesVersion() {
-    return perReplicaStates == null ? -1 : perReplicaStates.cversion;
+    PerReplicaStates prs = prsSupplier == null ? null : prsSupplier.get();
+    return prs == null ? -1 : prs.cversion;
   }
 
   public boolean isModified(int dataVersion, int childVersion) {
@@ -318,7 +320,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
         + "/"
         + znodeVersion
         + " "
-        + (perReplicaStates == null ? "" : perReplicaStates.toString())
+        + (prsSupplier == null ? "" : prsSupplier.get())
         + ")="
         + toJSONString(this);
   }
@@ -465,7 +467,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   }
 
   public PerReplicaStates getPerReplicaStates() {
-    return perReplicaStates;
+    return prsSupplier != null ? prsSupplier.get() : null;
+  }
+
+  public PrsSupplier getPrsSupplier() {
+    return prsSupplier;
   }
 
   public int getExpectedReplicaCount(Replica.Type type, int def) {
@@ -488,4 +494,27 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     String SHARDS = "shards";
     String PER_REPLICA_STATE = "perReplicaState";
   }
+
+  public static class PrsSupplier implements Supplier<PerReplicaStates> {
+
+    private volatile PerReplicaStates prs;
+
+    private Supplier<PerReplicaStates> supplier;
+
+    public PrsSupplier(Supplier<PerReplicaStates> supplier) {
+      this.supplier = supplier;
+    }
+
+    public PrsSupplier(PerReplicaStates prs) {
+      this.prs = prs;
+    }
+
+    @Override
+    public PerReplicaStates get() {
+      if (prs == null) {
+        prs = supplier.get();
+      }
+      return prs;
+    }
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index f756ea1883f..698da842845 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -32,6 +32,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.annotation.JsonProperty;
 import org.apache.solr.common.cloud.Replica.ReplicaStateProps;
@@ -302,4 +303,23 @@ public class PerReplicaStates implements ReflectMapWriter {
       return duplicate;
     }
   }
+
+  @Override
+  public void writeMap(EntryWriter ew) throws IOException {
+    ReflectMapWriter.super.writeMap(
+        new EntryWriter() {
+          @Override
+          public EntryWriter put(CharSequence k, Object v) throws IOException {
+            if ("states".equals(k.toString())) {
+              ew.put(
+                  "states",
+                  (IteratorWriter)
+                      iw -> states.forEachEntry((s, state) -> iw.addNoEx(state.toString())));
+            } else {
+              ew.put(k, v);
+            }
+            return this;
+          }
+        });
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 308d37faeca..dc90e6b975a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -137,11 +137,15 @@ public class Replica extends ZkNodeProps implements MapWriter {
   public final String core;
   public final Type type;
   public final String shard, collection;
-  private PerReplicaStates.State replicaState;
+  private DocCollection.PrsSupplier prsSupplier;
 
   // mutable
   private State state;
 
+  void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) {
+    this.prsSupplier = prsSupplier;
+  }
+
   public Replica(String name, Map<String, Object> map, String collection, String shard) {
     super(new HashMap<>());
     propMap.putAll(map);
@@ -151,7 +155,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
     this.node = (String) propMap.get(ReplicaStateProps.NODE_NAME);
     this.core = (String) propMap.get(ReplicaStateProps.CORE_NAME);
     this.type = Type.get((String) propMap.get(ReplicaStateProps.TYPE));
-    readPrs();
     // default to ACTIVE
     this.state =
         State.getState(
@@ -180,7 +183,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
     if (props != null) {
       this.propMap.putAll(props);
     }
-    readPrs();
     validate();
   }
 
@@ -202,7 +204,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
     this.node = String.valueOf(details.get("node_name"));
 
     this.propMap.putAll(details);
-    readPrs();
     type =
         Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ReplicaStateProps.TYPE, "NRT")));
     if (state == null)
@@ -211,22 +212,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
     validate();
   }
 
-  private void readPrs() {
-    ClusterState.getReplicaStatesProvider()
-        .get()
-        .ifPresent(
-            it -> {
-              log.debug("A replica  {} state fetched from per-replica state", name);
-              replicaState = it.getStates().get(name);
-              if (replicaState != null) {
-                propMap.put(
-                    ReplicaStateProps.STATE,
-                    replicaState.state.toString().toLowerCase(Locale.ROOT));
-                if (replicaState.isLeader) propMap.put(ReplicaStateProps.LEADER, "true");
-              }
-            });
-  }
-
   private final void validate() {
     Objects.requireNonNull(this.name, "'name' must not be null");
     Objects.requireNonNull(this.core, "'core' must not be null");
@@ -303,6 +288,14 @@ public class Replica extends ZkNodeProps implements MapWriter {
 
   /** Returns the {@link State} of this replica. */
   public State getState() {
+    if (prsSupplier != null) {
+      PerReplicaStates.State s = prsSupplier.get().get(name);
+      if (s != null) {
+        return s.state;
+      } else {
+        return State.DOWN;
+      }
+    }
     return state;
   }
 
@@ -312,7 +305,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
   }
 
   public boolean isActive(Set<String> liveNodes) {
-    return this.node != null && liveNodes.contains(this.node) && this.state == State.ACTIVE;
+    return this.node != null && liveNodes.contains(this.node) && getState() == State.ACTIVE;
   }
 
   public Type getType() {
@@ -320,6 +313,10 @@ public class Replica extends ZkNodeProps implements MapWriter {
   }
 
   public boolean isLeader() {
+    if (prsSupplier != null) {
+      PerReplicaStates.State st = prsSupplier.get().get(name);
+      return st == null ? false : st.isLeader;
+    }
     return getBool(ReplicaStateProps.LEADER, false);
   }
 
@@ -354,16 +351,18 @@ public class Replica extends ZkNodeProps implements MapWriter {
       if (state.isLeader) props.put(ReplicaStateProps.LEADER, "true");
     }
     Replica r = new Replica(name, props, collection, shard);
-    r.replicaState = state;
     return r;
   }
 
   public PerReplicaStates.State getReplicaState() {
-    return replicaState;
+    if (prsSupplier != null) {
+      return prsSupplier.get().get(name);
+    }
+    return null;
   }
 
   public Object clone() {
-    return new Replica(name, node, collection, shard, core, state, type, propMap);
+    return new Replica(name, node, collection, shard, core, getState(), type, propMap);
   }
 
   @Override
@@ -401,7 +400,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
           .put(ReplicaStateProps.COLLECTION, collection, p)
           .put(ReplicaStateProps.NODE_NAME, node, p)
           .put(ReplicaStateProps.TYPE, type.toString(), p)
-          .put(ReplicaStateProps.STATE, state.toString(), p);
+          .put(ReplicaStateProps.STATE, shard, p);
     };
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 5414942b480..76539e70ead 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -45,6 +45,18 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   public final String collection;
 
+  private DocCollection.PrsSupplier prsSupplier;
+
+  void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) {
+    this.prsSupplier = prsSupplier;
+    for (Replica r : replicas.values()) {
+      r.setPrsSupplier(prsSupplier);
+    }
+    if (leader == null) {
+      leader = findLeader();
+    }
+  }
+
   /**
    * Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON.
    */
@@ -132,7 +144,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   // FUTURE: optional per-slice override of the collection replicationFactor
   private final Integer replicationFactor;
   private final Map<String, Replica> replicas;
-  private final Replica leader;
+  private Replica leader;
   private final State state;
   private final String parent;
   private final Map<String, RoutingRule> routingRules;
@@ -204,8 +216,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     } else {
       this.routingRules = null;
     }
-
-    leader = findLeader();
   }
 
   @SuppressWarnings({"unchecked"})
@@ -275,7 +285,15 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   }
 
   public Replica getLeader() {
-    return leader;
+    if (prsSupplier != null) {
+      // this  is a PRS collection. leader may keep changing
+      return findLeader();
+    } else {
+      if (leader == null) {
+        leader = findLeader();
+      }
+      return leader;
+    }
   }
 
   public int getNumLeaderReplicas() {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 5be6a44e76d..44053fbeb57 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -87,7 +87,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
                 .build()) {
       livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
       ClusterState cs =
-          ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet());
+          ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet(), null);
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put(
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
index f5081ae0820..66b8cf121df 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
@@ -39,7 +39,13 @@ import org.slf4j.LoggerFactory;
 
 /** This test would be faster if we simulated the zk state instead. */
 @LogLevel(
-    "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+    "org.apache.solr.common.cloud.ZkStateReader=DEBUG;"
+        + "org.apache.solr.handler.admin.CollectionsHandler=DEBUG;"
+        + "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;"
+        + "org.apache.solr.cloud.Overseer=INFO;"
+        + "org.apache.solr.common.cloud=INFO;"
+        + "org.apache.solr.cloud.api.collections=INFO;"
+        + "org.apache.solr.cloud.overseer=INFO")
 public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());