You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/12/11 04:12:21 UTC

[lucene-solr] branch jira/solr14003 updated: eliminated all uses of Replica#getState()

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr14003
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr14003 by this push:
     new 7b80f07  eliminated all uses of Replica#getState()
7b80f07 is described below

commit 7b80f071ed3fc784d59a993f3568b0ead0907fcb
Author: noble <no...@apache.org>
AuthorDate: Wed Dec 11 15:07:24 2019 +1100

    eliminated all uses of Replica#getState()
---
 .../stream/AnalyticsShardRequestManager.java       |  4 +--
 .../org/apache/solr/cloud/ElectionContext.java     |  2 +-
 .../apache/solr/cloud/ExclusiveSliceProperty.java  |  6 +++--
 .../src/java/org/apache/solr/cloud/Overseer.java   |  2 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  5 ++--
 .../java/org/apache/solr/cloud/ZkController.java   |  9 ++++---
 .../solr/cloud/api/collections/BackupCmd.java      | 18 +++++++------
 .../cloud/api/collections/CreateCollectionCmd.java |  2 +-
 .../cloud/api/collections/CreateSnapshotCmd.java   | 18 +++++++------
 .../cloud/api/collections/DeleteCollectionCmd.java |  2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    | 18 +++++++------
 .../solr/cloud/api/collections/DeleteShardCmd.java | 18 ++++++-------
 .../cloud/api/collections/DeleteSnapshotCmd.java   | 16 ++++++-----
 .../OverseerCollectionMessageHandler.java          |  2 +-
 .../solr/cloud/api/collections/RestoreCmd.java     |  4 ++-
 .../autoscaling/sim/SimClusterStateProvider.java   |  5 ++--
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  8 +++---
 .../java/org/apache/solr/core/BlobRepository.java  |  4 ++-
 .../java/org/apache/solr/core/CoreContainer.java   | 11 +++++---
 .../src/java/org/apache/solr/core/SolrCore.java    | 10 +++++++
 .../java/org/apache/solr/handler/IndexFetcher.java | 21 +++++++--------
 .../org/apache/solr/handler/SolrConfigHandler.java |  4 ++-
 .../org/apache/solr/handler/admin/ColStatus.java   |  2 +-
 .../solr/handler/admin/CollectionsHandler.java     |  9 ++++---
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  2 +-
 .../solr/handler/component/HttpShardHandler.java   |  8 +++---
 .../org/apache/solr/schema/ManagedIndexSchema.java |  7 +++--
 .../solr/search/join/ScoreJoinQParserPlugin.java   |  6 +++--
 .../java/org/apache/solr/servlet/HttpSolrCall.java | 15 ++++++-----
 .../processor/DistributedZkUpdateProcessor.java    | 12 ++++++---
 .../test/org/apache/solr/cloud/AddReplicaTest.java | 10 ++++---
 .../cloud/AssignBackwardCompatibilityTest.java     |  4 ++-
 .../solr/cloud/ChaosMonkeyShardSplitTest.java      |  4 ++-
 .../solr/cloud/DeleteInactiveReplicaTest.java      |  2 +-
 .../org/apache/solr/cloud/DeleteReplicaTest.java   | 12 +++++----
 .../org/apache/solr/cloud/ForceLeaderTest.java     | 16 ++++++-----
 .../solr/cloud/HttpPartitionOnCommitTest.java      | 25 +++++++++++------
 .../org/apache/solr/cloud/HttpPartitionTest.java   | 16 ++++++-----
 .../cloud/LeaderFailureAfterFreshStartTest.java    |  4 ++-
 .../apache/solr/cloud/LeaderTragicEventTest.java   | 11 ++++----
 .../solr/cloud/LegacyCloudClusterPropTest.java     |  7 +++--
 .../org/apache/solr/cloud/NodeMutatorTest.java     | 17 +++++++-----
 .../apache/solr/cloud/PeerSyncReplicationTest.java |  4 ++-
 .../test/org/apache/solr/cloud/RecoveryZkTest.java | 11 +++++---
 .../org/apache/solr/cloud/ReplaceNodeTest.java     |  6 +++--
 .../cloud/SharedFSAutoReplicaFailoverTest.java     | 11 ++++----
 .../test/org/apache/solr/cloud/SyncSliceTest.java  | 19 +++++++------
 .../apache/solr/cloud/TestCloudConsistency.java    | 20 ++++++++------
 .../solr/cloud/TestCloudSearcherWarming.java       |  2 +-
 .../org/apache/solr/cloud/TestPullReplica.java     | 31 +++++++++++-----------
 .../solr/cloud/TestPullReplicaErrorHandling.java   |  8 +++---
 .../solr/cloud/TestQueryingOnDownCollection.java   |  4 ++-
 .../org/apache/solr/cloud/TestTlogReplica.java     | 18 ++++++++-----
 .../api/collections/CollectionReloadTest.java      |  9 +++++--
 .../collections/CollectionTooManyReplicasTest.java |  2 +-
 .../api/collections/CustomCollectionTest.java      |  2 +-
 .../solr/cloud/api/collections/ShardSplitTest.java |  2 +-
 .../solr/cloud/hdfs/HDFSCollectionsAPITest.java    |  2 +-
 .../core/snapshots/TestSolrCloudSnapshots.java     | 10 ++++---
 .../solr/update/TestInPlaceUpdatesDistrib.java     | 10 ++++---
 .../solr/client/solrj/cloud/DirectShardState.java  |  5 ++++
 .../client/solrj/cloud/ShardStateProvider.java     |  4 +++
 .../solrj/cloud/autoscaling/PolicyHelper.java      |  4 ++-
 .../client/solrj/impl/BaseCloudSolrClient.java     |  2 +-
 .../solrj/io/stream/FeaturesSelectionStream.java   |  6 ++---
 .../client/solrj/io/stream/TextLogitStream.java    |  5 +++-
 .../solr/client/solrj/io/stream/TopicStream.java   | 27 +++++++------------
 .../solr/client/solrj/io/stream/TupleStream.java   |  6 +++--
 .../apache/solr/common/cloud/ClusterStateUtil.java |  8 +++---
 .../common/cloud/CollectionStatePredicate.java     |  4 +--
 .../java/org/apache/solr/common/cloud/Replica.java |  1 +
 .../apache/solr/common/cloud/ZkStateReader.java    | 17 +++++++-----
 .../solr/cloud/AbstractDistribZkTestBase.java      | 27 +++++++++++--------
 .../solr/cloud/AbstractFullDistribZkTestBase.java  |  7 ++---
 .../java/org/apache/solr/cloud/ChaosMonkey.java    |  4 ++-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |  2 +-
 76 files changed, 404 insertions(+), 274 deletions(-)

diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
index a8c50ef..04ab54c 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -33,8 +33,8 @@ import org.apache.solr.analytics.AnalyticsRequestManager;
 import org.apache.solr.analytics.AnalyticsRequestParser;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -113,7 +113,7 @@ public class AnalyticsShardRequestManager {
         Collection<Replica> replicas = slice.getReplicas();
         List<Replica> shuffler = new ArrayList<>();
         for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+          if(zkStateReader.getShardStateProvider(collection).isActive(replica))
           shuffler.add(replica);
         }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 456daee..1585440 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -580,7 +580,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         ClusterState clusterState = zkStateReader.getClusterState();
         Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
         if (rep == null) return;
-        if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
+        if (zkStateReader.getShardStateProvider(collection).getState(rep) != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
           log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
           zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
index 23f684e..24e3cb2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
@@ -55,6 +55,7 @@ class ExclusiveSliceProperty {
   private final String property;
   private final DocCollection collection;
   private final String collectionName;
+  private final ZkStateReader zkStateReader;
 
   // Key structure. For each node, list all replicas on it regardless of whether they have the property or not.
   private final Map<String, List<SliceReplica>> nodesHostingReplicas = new HashMap<>();
@@ -71,8 +72,9 @@ class ExclusiveSliceProperty {
 
   private int assigned = 0;
 
-  ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
+  ExclusiveSliceProperty(ZkStateReader reader, ClusterState clusterState, ZkNodeProps message) {
     this.clusterState = clusterState;
+    this.zkStateReader = reader;
     String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
     if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
       tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
@@ -109,7 +111,7 @@ class ExclusiveSliceProperty {
   }
 
   private boolean isActive(Replica replica) {
-    return replica.getState() == Replica.State.ACTIVE;
+    return zkStateReader.getShardStateProvider(collectionName).getState(replica) == Replica.State.ACTIVE;
   }
 
   // Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 5a83a6c..ee834f2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -385,7 +385,7 @@ public class Overseer implements SolrCloseable {
           case DELETEREPLICAPROP:
             return Collections.singletonList(new ReplicaMutator(getSolrCloudManager()).deleteReplicaProperty(clusterState, message));
           case BALANCESHARDUNIQUE:
-            ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
+            ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(reader, clusterState, message);
             if (dProp.balanceProperty()) {
               String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
               return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection()));
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 39af7df..d68bc16 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -772,9 +773,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
     while (true) {
       CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
       DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(docCollection.getName());
       if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
-          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
-              .getState() == Replica.State.ACTIVE) {
+          ssp.getState(docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()))== Replica.State.ACTIVE) {
         // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
         zkController.publish(coreDesc, Replica.State.DOWN);
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index fc9af6f..1419efe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -655,6 +655,7 @@ public class ZkController implements Closeable {
     assert cd != null;
     DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName());
     if (dc == null) return;
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(cd.getCollectionName());
 
     Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
     if (shard == null) return;
@@ -663,7 +664,7 @@ public class ZkController implements Closeable {
     if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != shard.getLeader()) return;
 
     int numActiveReplicas = shard.getReplicas(
-        rep -> rep.getState() == Replica.State.ACTIVE
+        rep -> ssp.getState(rep) == Replica.State.ACTIVE
             && rep.getType() != Type.PULL
             && getClusterState().getLiveNodes().contains(rep.getNodeName())
     ).size();
@@ -1038,7 +1039,7 @@ public class ZkController implements Closeable {
         for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
           if (coreDescriptor.getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica))  {
             Replica replica = collectionState.getReplica(coreDescriptor.getCloudDescriptor().getCoreNodeName());
-            if (replica == null || replica.getState() != Replica.State.DOWN) {
+            if (replica == null || ssp.getState(replica) != Replica.State.DOWN) {
               foundStates = false;
             }
           }
@@ -1174,7 +1175,7 @@ public class ZkController implements Closeable {
       // check replica's existence in clusterstate first
       try {
         zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
-            TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+            TimeUnit.MILLISECONDS, (collectionState, ssp) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
       } catch (TimeoutException e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
       }
@@ -1825,7 +1826,7 @@ public class ZkController implements Closeable {
       AtomicReference<String> errorMessage = new AtomicReference<>();
       AtomicReference<DocCollection> collectionState = new AtomicReference<>();
       try {
-        zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
+        zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c,ssp) -> {
           collectionState.set(c);
           if (c == null)
             return false;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index ceec4e2..55da606 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -16,12 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.time.Instant;
@@ -31,6 +25,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.lucene.util.Version;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -56,6 +51,12 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -143,16 +144,17 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
 
     Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+    ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(slice.collection);
     if (leaderCore.isPresent()) {
       log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
       Replica r = slice.getReplica(leaderCore.get().getCoreName());
-      if ((r != null) && !r.getState().equals(State.DOWN)) {
+      if ((r != null) && ssp.getState(r) != State.DOWN) {
         return r;
       }
     }
 
     Optional<Replica> r = slice.getReplicas().stream()
-                               .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+                               .filter(it -> ssp.getState(it) != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), it))
                                .findFirst();
 
     if (!r.isPresent()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index bc2a0b5..4cf2722 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -322,7 +322,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
             CollectionAdminParams.COLOCATED_WITH, collectionName);
         ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
         try {
-          zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
+          zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState,ssp) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
         } catch (TimeoutException e) {
           log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
           // maybe the overseer queue is backed up, we don't want to fail the create request
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index abc3597..02e3652 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -16,12 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Date;
@@ -31,16 +25,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -53,6 +48,12 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 /**
  * This class implements the functionality of creating a collection level snapshot.
  */
@@ -94,12 +95,13 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
 
     NamedList shardRequestResults = new NamedList();
     Map<String, Slice> shardByCoreName = new HashMap<>();
+    ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(collectionName);
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
       for (Replica replica : slice.getReplicas()) {
-        if (replica.getState() != State.ACTIVE) {
+        if (ssp.getState(replica) != State.ACTIVE) {
           log.info("Replica {} is not active. Hence not sending the createsnapshot request", replica.getCoreName());
           continue; // Since replica is not active - no point sending a request.
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 648f5ba..737ef9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -147,7 +147,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
 
       // wait for a while until we don't see the collection
-      zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
+      zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState,ssp) -> collectionState == null);
 
       // we can delete any remaining unique aliases
       if (!aliasReferences.isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 83f9141..4709db9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -16,13 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
@@ -51,6 +45,13 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
 
 public class DeleteReplicaCmd implements Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -222,7 +223,8 @@ public class DeleteReplicaCmd implements Cmd {
 
     // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
     // on the command.
-    if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+    ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(collectionName);
+    if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && ssp.getState(replica) != Replica.State.DOWN) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
               "Attempted to remove replica : " + collectionName + "/"  + shard + "/" + replicaName +
               " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 4aca282..e0e827b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -17,14 +17,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -51,6 +43,14 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
 public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
@@ -141,7 +141,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       ZkStateReader zkStateReader = ocmh.zkStateReader;
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
 
-      zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
+      zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c,ssp) -> c.getSlice(sliceId) == null);
 
       log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
     } catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 634692d..9269c75 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -16,12 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -29,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -51,6 +46,12 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 /**
  * This class implements the functionality of deleting a collection level snapshot.
  */
@@ -101,9 +102,10 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
 
     final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
+    ShardStateProvider ssp = ocmh.zkStateReader.getShardStateProvider(collectionName);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
       for (Replica replica : slice.getReplicas()) {
-        if (replica.getState() == State.DOWN) {
+        if (ssp.getState(replica) == State.DOWN) {
           continue; // Since replica is down - no point sending a request.
         }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 54b7f5b..13bb2da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -415,7 +415,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
   boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
     try {
-      zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
+      zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c,ssp) -> {
           if (c == null)
             return true;
           Slice slice = c.getSlice(shard);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index aa5cbe9..9e81941 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
@@ -114,6 +115,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
         zkStateReader.getClusterState().getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM);
 
+    ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(restoreCollectionName);
     int numShards = backupCollectionState.getActiveSlices().size();
 
     int numNrtReplicas;
@@ -339,7 +341,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
           for (Replica r : s.getReplicas()) {
             String nodeName = r.getNodeName();
             String coreNodeName = r.getCoreName();
-            Replica.State stateRep = r.getState();
+            Replica.State stateRep = ssp.getState(r);
 
             log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}", nodeName, coreNodeName,
                 stateRep.name());
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 6d731a6..8d2ea0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -2216,6 +2216,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       collectionsStatesRef.set(null);
       ClusterState state = getClusterState();
       state.forEachCollection(coll -> {
+        ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(coll.getName());
         Map<String, Object> perColl = new LinkedHashMap<>();
         stats.put(coll.getName(), perColl);
         perColl.put("shardsTotal", coll.getSlices().size());
@@ -2258,11 +2259,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
           }
 
           for (Replica r : s.getReplicas()) {
-            if (r.getState() == Replica.State.ACTIVE) {
+            if (ssp.getState(r) == Replica.State.ACTIVE) {
               activeReplicas++;
             }
           }
-          Replica leader = s.getLeader();
+          Replica leader = ssp.getLeader(s);
           if (leader == null) {
             noLeader++;
             if (!s.getReplicas().isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index a70687a..9811cb34 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.CloudUtil;
@@ -380,16 +381,17 @@ public class ReplicaMutator {
   private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
     Slice slice = collection.getSlice(sliceName);
     Map<String, Object> sliceProps = slice.getProperties();
+    ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(collection.getName());
     if (slice.getState() == Slice.State.RECOVERY) {
       log.info("Shard: {} is in recovery state", sliceName);
       // is this replica active?
-      if (replica.getState() == Replica.State.ACTIVE) {
+      if (ssp.getState(replica) == Replica.State.ACTIVE) {
         log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
         // are all other replicas also active?
         boolean allActive = true;
         for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
           if (coreNodeName.equals(entry.getKey())) continue;
-          if (entry.getValue().getState() != Replica.State.ACTIVE) {
+          if (ssp.getState(entry.getValue()) != Replica.State.ACTIVE) {
             allActive = false;
             break;
           }
@@ -409,7 +411,7 @@ public class ReplicaMutator {
                 log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
                 // this is a fellow sub shard so check if all replicas are active
                 for (Map.Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
-                  if (sliceEntry.getValue().getState() != Replica.State.ACTIVE) {
+                  if (ssp.getState(sliceEntry.getValue()) != Replica.State.ACTIVE) {
                     allActive = false;
                     break outer;
                   }
diff --git a/solr/core/src/java/org/apache/solr/core/BlobRepository.java b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
index 59bd795..7d7e236 100644
--- a/solr/core/src/java/org/apache/solr/core/BlobRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
@@ -38,6 +38,7 @@ import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -238,6 +239,7 @@ public class BlobRepository {
     DocCollection coll = cs.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
     if (coll == null)
       throw new SolrException(SERVICE_UNAVAILABLE, CollectionAdminParams.SYSTEM_COLL + " collection not available");
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(CollectionAdminParams.SYSTEM_COLL);
     ArrayList<Slice> slices = new ArrayList<>(coll.getActiveSlices());
     if (slices.isEmpty())
       throw new SolrException(SERVICE_UNAVAILABLE, "No active slices for " + CollectionAdminParams.SYSTEM_COLL + " collection");
@@ -248,7 +250,7 @@ public class BlobRepository {
       List<Replica> replicas = new ArrayList<>(slice.getReplicasMap().values());
       Collections.shuffle(replicas, RANDOM);
       for (Replica r : replicas) {
-        if (r.getState() == Replica.State.ACTIVE) {
+        if (ssp.getState(r) == Replica.State.ACTIVE) {
           if (zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))) {
             replica = r;
             break;
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a9db133..fc9fd17 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -49,6 +49,7 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.api.AnnotatedApi;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -69,6 +70,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
@@ -1342,12 +1344,13 @@ public class CoreContainer {
       case fromleader: // Recovery from leader on a CorruptedIndexException
         if (isZooKeeperAware()) {
           CloudDescriptor desc = dcore.getCloudDescriptor();
+          ShardStateProvider ssp = getZkController().getZkStateReader().getShardStateProvider(desc.getCollectionName());
           try {
-            Replica leader = getZkController().getClusterState()
+            Slice slice = getZkController().getClusterState()
                 .getCollection(desc.getCollectionName())
-                .getSlice(desc.getShardId())
-                .getLeader();
-            if (leader != null && leader.getState() == State.ACTIVE) {
+                .getSlice(desc.getShardId());
+            Replica leader = ssp.getLeader(slice);
+            if (leader != null && ssp.getState(leader) == State.ACTIVE) {
               log.info("Found active leader, will attempt to create fresh core and recover.");
               resetIndexDirectory(dcore, coreConfig);
               // the index of this core is emptied, its term should be set to 0
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 2c6b175..5210f7c 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -74,6 +74,7 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.RecoveryStrategy;
@@ -238,6 +239,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   public volatile boolean searchEnabled = true;
   public volatile boolean indexEnabled = true;
   public volatile boolean readOnly = false;
+  private ShardStateProvider shardStateProvider;
 
   private PackageListeners packageListeners = new PackageListeners(this);
 
@@ -3200,6 +3202,14 @@ public final class SolrCore implements SolrInfoBean, Closeable {
     });
     return blobRef;
   }
+  public ShardStateProvider getShardStateProvider(){
+    if(this.shardStateProvider == null) {
+      if(coreContainer.getZkController() != null){
+        this.shardStateProvider = coreContainer.getZkController().getZkStateReader().getShardStateProvider(this.getCoreDescriptor().getCloudDescriptor().getCollectionName());
+      }
+    }
+    return shardStateProvider;
+  }
 
   /**
    * Run an arbitrary task in it's own thread. This is an expert option and is
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 0f873af..75ff267 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -70,15 +70,16 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -384,13 +385,17 @@ public class IndexFetcher {
     try {
       if (fetchFromLeader) {
         assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
-        Replica replica = getLeaderReplica();
         CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+        ZkStateReader zkStateReader = solrCore.getCoreContainer().getZkController().getZkStateReader();
+        ShardStateProvider ssp = zkStateReader.getShardStateProvider(cd.getCollectionName());
+        Replica replica = ssp
+            .getLeader(zkStateReader.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()));
+
         if (cd.getCoreNodeName().equals(replica.getName())) {
           return IndexFetchResult.EXPECTING_NON_LEADER;
         }
-        if (replica.getState() != Replica.State.ACTIVE) {
-          log.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
+        if (ssp.getState(replica) != Replica.State.ACTIVE) {
+          log.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), ssp.getState(replica));
           return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
         }
         if (!solrCore.getCoreContainer().getZkController().getClusterState().liveNodesContain(replica.getNodeName())) {
@@ -692,14 +697,6 @@ public class IndexFetcher {
     }
   }
 
-  private Replica getLeaderReplica() throws InterruptedException {
-    ZkController zkController = solrCore.getCoreContainer().getZkController();
-    CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
-    Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
-        cd.getCollectionName(), cd.getShardId());
-    return leaderReplica;
-  }
-
   private void cleanup(final SolrCore core, Directory tmpIndexDir,
       Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
     try {
diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
index 004da31..b4b9df0 100644
--- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
@@ -44,6 +44,7 @@ import org.apache.solr.api.ApiBag;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
@@ -823,6 +824,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
     ClusterState clusterState = zkController.getZkStateReader().getClusterState();
     Set<String> liveNodes = clusterState.getLiveNodes();
     final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
+    ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
     if (docCollection != null && docCollection.getActiveSlices() != null && docCollection.getActiveSlices().size() > 0) {
       final Collection<Slice> activeSlices = docCollection.getActiveSlices();
       for (Slice next : activeSlices) {
@@ -830,7 +832,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
         if (replicasMap != null) {
           for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
             Replica replica = entry.getValue();
-            if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+            if (ssp.isActive(replica)) {
               activeReplicaCoreUrls.add(replica.getCoreUrl());
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 434905b..e22337d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -129,7 +129,7 @@ public class ColStatus {
             downReplicas++;
             continue;
           }
-          switch (r.getState()) {
+          switch (ssp.getState(r)) {
             case ACTIVE:
               activeReplicas++;
               break;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 09fc02c..6487789 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.api.Api;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -1340,11 +1341,12 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "No shard with name " + sliceId + " exists for collection " + collectionName);
     }
+    ShardStateProvider ssp = handler.coreContainer.getZkController().getZkStateReader().getShardStateProvider(collectionName);
 
     try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
       // if an active replica is the leader, then all is fine already
-      Replica leader = slice.getLeader();
-      if (leader != null && leader.getState() == State.ACTIVE) {
+      Replica leader = ssp.getLeader(slice);
+      if (leader != null && ssp.getState(leader) == State.ACTIVE) {
         throw new SolrException(ErrorCode.SERVER_ERROR,
             "The shard already has an active leader. Force leader is not applicable. State: " + slice);
       }
@@ -1369,7 +1371,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         clusterState = handler.coreContainer.getZkController().getClusterState();
         collection = clusterState.getCollection(collectionName);
         slice = collection.getSlice(sliceId);
-        if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
+        Replica ldr = ssp.getLeader(slice);
+        if (ldr != null && ssp.getState(ldr) == State.ACTIVE) {
           success = true;
           break;
         }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index a000477..4e58b20 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -96,7 +96,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
         if (slice != null) {
           final Replica replica = slice.getReplicasMap().get(coreNodeName);
           if (replica != null) {
-            state = replica.getState();
+            state = ssp.getState(replica);
             live = n.contains(nodeName);
 
             final Replica.State localState = cloudDescriptor.getLastPublished();
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 69da394..6e4949c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -43,8 +43,8 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.LBSolrClient;
-import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
@@ -473,7 +473,7 @@ public class HttpShardHandler extends ShardHandler {
             }
           };
 
-          final List<Replica> eligibleSliceReplicas = collectEligibleReplicas(slice, clusterState, onlyNrtReplicas, isShardLeader);
+          final List<Replica> eligibleSliceReplicas = collectEligibleReplicas(rb, slice, clusterState, onlyNrtReplicas, isShardLeader);
 
           final List<String> shardUrls = transformReplicasToShardUrls(replicaListTransformer, eligibleSliceReplicas);
 
@@ -514,12 +514,12 @@ public class HttpShardHandler extends ShardHandler {
     }
   }
 
-  private static List<Replica> collectEligibleReplicas(Slice slice, ClusterState clusterState, boolean onlyNrtReplicas, Predicate<Replica> isShardLeader) {
+  private static List<Replica> collectEligibleReplicas(ResponseBuilder rb, Slice slice, ClusterState clusterState, boolean onlyNrtReplicas, Predicate<Replica> isShardLeader) {
     final Collection<Replica> allSliceReplicas = slice.getReplicasMap().values();
     final List<Replica> eligibleSliceReplicas = new ArrayList<>(allSliceReplicas.size());
     for (Replica replica : allSliceReplicas) {
       if (!clusterState.liveNodesContain(replica.getNodeName())
-          || replica.getState() != Replica.State.ACTIVE
+          || rb.req.getCore().getShardStateProvider().getState(replica) != Replica.State.ACTIVE
           || (onlyNrtReplicas && replica.getType() == Replica.Type.PULL)) {
         continue;
       }
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index 57b0c90..7165089 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -49,6 +48,7 @@ import org.apache.solr.analysis.TokenizerChain;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
@@ -287,7 +287,7 @@ public final class ManagedIndexSchema extends IndexSchema {
     List<String> activeReplicaCoreUrls = new ArrayList<>();
     ZkStateReader zkStateReader = zkController.getZkStateReader();
     ClusterState clusterState = zkStateReader.getClusterState();
-    Set<String> liveNodes = clusterState.getLiveNodes();
+    ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
     final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
     if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
       final Slice[] activeSlices = docCollection.getActiveSlicesArr();
@@ -297,8 +297,7 @@ public final class ManagedIndexSchema extends IndexSchema {
           for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
             Replica replica = entry.getValue();
             if (!localCoreNodeName.equals(replica.getName()) &&
-                replica.getState() == Replica.State.ACTIVE &&
-                liveNodes.contains(replica.getNodeName())) {
+                ssp.isActive(replica)) {
               ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
               activeReplicaCoreUrls.add(replicaCoreProps.getCoreUrl());
             }
diff --git a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
index 7bd78c0..3d689bd 100644
--- a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
+++ b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
@@ -26,6 +26,7 @@ import org.apache.lucene.search.QueryVisitor;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.search.join.JoinUtil;
 import org.apache.lucene.search.join.ScoreMode;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
@@ -306,6 +307,7 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
     String fromReplica = null;
 
     String nodeName = zkController.getNodeName();
+    ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(fromIndex);
     for (Slice slice : zkController.getClusterState().getCollection(fromIndex).getActiveSlicesArr()) {
       if (fromReplica != null)
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
@@ -315,10 +317,10 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
         if (replica.getNodeName().equals(nodeName)) {
           fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
           // found local replica, but is it Active?
-          if (replica.getState() != Replica.State.ACTIVE)
+          if (ssp.getState(replica) != Replica.State.ACTIVE)
             throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
                 "SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
-                    ") on "+nodeName+", but it is "+replica.getState());
+                    ") on "+nodeName+", but it is "+ssp.getState(replica));
 
           break;
         }
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 9c1d357..3ffef05 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -56,6 +56,7 @@ import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.solr.api.ApiBag;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.common.SolrException;
@@ -927,24 +928,24 @@ public class HttpSolrCall {
       return null;
     }
 
-    Set<String> liveNodes = clusterState.getLiveNodes();
+    ShardStateProvider ssp = cores.getZkController().getZkStateReader().getShardStateProvider(collectionName);
 
     if (isPreferLeader) {
       List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
-      SolrCore core = randomlyGetSolrCore(liveNodes, leaderReplicas);
+      SolrCore core = randomlyGetSolrCore(ssp, leaderReplicas);
       if (core != null) return core;
     }
 
     List<Replica> replicas = collection.getReplicas(cores.getZkController().getNodeName());
-    return randomlyGetSolrCore(liveNodes, replicas);
+    return randomlyGetSolrCore(ssp, replicas);
   }
 
-  private SolrCore randomlyGetSolrCore(Set<String> liveNodes, List<Replica> replicas) {
+  private SolrCore randomlyGetSolrCore(ShardStateProvider ssp, List<Replica> replicas) {
     if (replicas != null) {
       RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
       while (it.hasNext()) {
         Replica replica = it.next();
-        if (liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
+        if (ssp.isActive(replica)) {
           SolrCore core = checkProps(replica);
           if (core != null) return core;
         }
@@ -1040,14 +1041,14 @@ public class HttpSolrCall {
     String coreUrl;
     Set<String> liveNodes = clusterState.getLiveNodes();
     Collections.shuffle(slices, random);
+    ShardStateProvider ssp = this.cores.getZkController().getZkStateReader().getShardStateProvider(collectionName);
 
     for (Slice slice : slices) {
       List<Replica> randomizedReplicas = new ArrayList<>(slice.getReplicas());
       Collections.shuffle(randomizedReplicas, random);
 
       for (Replica replica : randomizedReplicas) {
-        if (!activeReplicas || (liveNodes.contains(replica.getNodeName())
-            && replica.getState() == Replica.State.ACTIVE)) {
+        if (!activeReplicas || ssp.isActive(replica)) {
 
           if (byCoreName && !origCorename.equals(replica.getStr(CORE_NAME_PROP))) {
             // if it's by core name, make sure they match
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 569f877..58c227b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.Overseer;
@@ -71,6 +72,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.GET_LEADER_RETRY_DEFAULT_TIMEOUT;
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
 public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@@ -641,14 +643,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     try {
       // Not equivalent to getLeaderProps, which  retries to find a leader.
       // Replica leader = slice.getLeader();
-      Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+      ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
+      Replica leaderReplica = ssp.getLeader(slice, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
       isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
 
       if (!isLeader) {
         isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
         if (isSubShardLeader) {
           shardId = cloudDesc.getShardId();
-          leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+          leaderReplica = ssp.getLeader(slice, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
         }
       }
 
@@ -694,7 +697,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
             log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
             skippedCoreNodeNames.add(replica.getName());
-          } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
+          } else if ( !clusterState.getLiveNodes().contains(replica.getNodeName()) || ssp.getState(replica) == Replica.State.DOWN) {
             skippedCoreNodeNames.add(replica.getName());
           } else {
             nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
@@ -818,6 +821,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   }
 
   protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
+    ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(leaderReplica.collection);
     String leaderCoreNodeName = leaderReplica.getName();
     List<Replica> replicas = clusterState.getCollection(collection)
         .getSlice(shardId)
@@ -847,7 +851,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
         skippedCoreNodeNames.add(replica.getName());
       } else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
-          || replica.getState() == Replica.State.DOWN) {
+          || ssp.getState(replica) == Replica.State.DOWN) {
         skippedCoreNodeNames.add(replica.getName());
       } else {
         nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId));
diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
index 3bfda38..fe60c6f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
@@ -16,14 +16,12 @@
  */
 package org.apache.solr.cloud;
 
-import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
-import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
-
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.LinkedHashSet;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.RequestStatusState;
@@ -36,6 +34,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
+import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
+
 /**
  *
  */
@@ -137,6 +138,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
 
     ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
     DocCollection coll = clusterState.getCollection(collection);
+    ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(collection);
     String sliceName = coll.getSlices().iterator().next().getName();
     Collection<Replica> replicas = coll.getSlice(sliceName).getReplicas();
     CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName);
@@ -192,7 +194,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
       if (replica.getName().equals(replica2)) {
         continue; // may be still recovering
       }
-      assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
+      assertSame(coll.toString() + "\n" + replica.toString(),ssp.getState(replica), Replica.State.ACTIVE);
     }
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
index c1ba972..884fe2f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.common.cloud.DocCollection;
@@ -63,6 +64,7 @@ public class AssignBackwardCompatibilityTest extends SolrCloudTestCase {
 
     int numOperations = random().nextInt(15) + 15;
     int numLiveReplicas = 4;
+    ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(COLLECTION);
 
     boolean clearedCounter = false;
     for (int i = 0; i < numOperations; i++) {
@@ -78,7 +80,7 @@ public class AssignBackwardCompatibilityTest extends SolrCloudTestCase {
       if (deleteReplica) {
         cluster.waitForActiveCollection(COLLECTION, 1, numLiveReplicas);
         DocCollection dc = getCollectionState(COLLECTION);
-        Replica replica = getRandomReplica(dc.getSlice("shard1"), (r) -> r.getState() == Replica.State.ACTIVE);
+        Replica replica = getRandomReplica(dc.getSlice("shard1"), (r) -> ssp.getState(r)== Replica.State.ACTIVE);
         CollectionAdminRequest.deleteReplica(COLLECTION, "shard1", replica.getName()).process(cluster.getSolrClient());
         coreNames.remove(replica.getCoreName());
         numLiveReplicas--;
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 50e2443..2d2b292 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.api.collections.ShardSplitTest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
@@ -224,9 +225,10 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
       DocCollection collection1 = clusterState.getCollection("collection1");
       Slice slice = collection1.getSlice("shard1");
       Collection<Replica> replicas = slice.getReplicas();
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
       boolean allActive = true;
       for (Replica replica : replicas) {
-        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+        if (!ssp.isActive(replica)) {
           allActive = false;
           break;
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
index ed01b6f..b6bec01 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
@@ -78,7 +78,7 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
 
     waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c, ssp) -> {
       Replica r = c.getReplica(replica.getCoreName());
-      return r == null || r.getState() != Replica.State.ACTIVE;
+      return r == null || ssp.getState(r) != Replica.State.ACTIVE;
     });
 
     log.info("Removing replica {}/{} ", shard.getName(), replica.getName());
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index c284036..e188ccb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@@ -99,11 +100,12 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     cluster.waitForActiveCollection(collectionName, 2, 4);
 
     DocCollection state = getCollectionState(collectionName);
+    ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
     Slice shard = getRandomShard(state);
     
     // don't choose the leader to shutdown, it just complicates things unneccessarily
     Replica replica = getRandomReplica(shard, (r) ->
-                                       ( r.getState() == Replica.State.ACTIVE &&
+                                       ( shardStateProvider.getState(r) == Replica.State.ACTIVE &&
                                          ! r.equals(shard.getLeader())));
 
     CoreStatus coreStatus = getCoreStatus(replica);
@@ -239,7 +241,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
     // don't choose the leader to shutdown, it just complicates things unneccessarily
     Replica replica = getRandomReplica(shard, (r) ->
-                                       ( r.getState() == Replica.State.ACTIVE &&
+                                       ( cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName).getState(r) == Replica.State.ACTIVE &&
                                          ! r.equals(shard.getLeader())));
     
     JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
@@ -376,7 +378,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
       replica1Jetty.stop();
       waitForNodeLeave(replica1JettyNodeName);
       waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState, ssp)
-          -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+          -> ssp.getState(collectionState.getSlice("shard1").getReplica(replica1.getName())) == DOWN);
       replica1Jetty.start();
       waitingForReplicaGetDeleted.acquire();
     } finally {
@@ -405,8 +407,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
     waitForState("Expected new active leader", collectionName, (liveNodes, collectionState, ssp) -> {
       Slice shard = collectionState.getSlice("shard1");
-      Replica newLeader = shard.getLeader();
-      return newLeader != null && newLeader.getState() == Replica.State.ACTIVE && !newLeader.getName().equals(latestLeader.getName());
+      Replica newLeader = ssp.getLeader(shard);
+      return newLeader != null && ssp.getState(newLeader) == Replica.State.ACTIVE && !newLeader.getName().equals(latestLeader.getName());
     });
 
     leaderJetty.start();
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 7509e62..5950758 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import com.carrotsearch.randomizedtesting.annotations.Nightly;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -98,7 +99,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
       waitForState(testCollectionName, leader.getName(), State.DOWN, 60000);
       cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
       ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
-      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      int numActiveReplicas = getNumberOfActiveReplicas(cloudClient.getZkStateReader().getShardStateProvider(testCollectionName), clusterState, testCollectionName, SHARD1);
       assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
           "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
 
@@ -129,9 +130,10 @@ public class ForceLeaderTest extends HttpPartitionTest {
       Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
       assertNotNull(newLeader);
       // leader is active
-      assertEquals(State.ACTIVE, newLeader.getState());
+      ShardStateProvider shardStateProvider = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
+      assertEquals(State.ACTIVE, shardStateProvider.getState(newLeader));
 
-      numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      numActiveReplicas = getNumberOfActiveReplicas(shardStateProvider, clusterState, testCollectionName, SHARD1);
       assertEquals(2, numActiveReplicas);
 
       // Assert that indexing works again
@@ -204,7 +206,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
       ClusterState clusterState = zkController.getZkStateReader().getClusterState();
       boolean allDown = true;
       for (Replica replica : clusterState.getCollection(collectionName).getSlice(shard).getReplicas()) {
-        if (replica.getState() != State.DOWN) {
+        if (zkController.getZkStateReader().getShardStateProvider(collectionName).getState(replica) != State.DOWN) {
           allDown = false;
         }
       }
@@ -244,7 +246,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
     cloudClient.getZkStateReader().forceUpdateCollection(collection);
     ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
     log.info("After bringing back leader: " + clusterState.getCollection(collection).getSlice(SHARD1));
-    int numActiveReplicas = getNumberOfActiveReplicas(clusterState, collection, SHARD1);
+    int numActiveReplicas = getNumberOfActiveReplicas(cloudClient.getZkStateReader().getShardStateProvider(collection), clusterState, collection, SHARD1);
     assertEquals(1+notLeaders.size(), numActiveReplicas);
     log.info("Sending doc "+docid+"...");
     sendDoc(docid);
@@ -271,11 +273,11 @@ public class ForceLeaderTest extends HttpPartitionTest {
     }
   }
 
-  private int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) {
+  private int getNumberOfActiveReplicas(ShardStateProvider ssp, ClusterState clusterState, String collection, String sliceId) {
     int numActiveReplicas = 0;
     // Assert all replicas are active
     for (Replica rep : clusterState.getCollection(collection).getSlice(sliceId).getReplicas()) {
-      if (rep.getState().equals(State.ACTIVE)) {
+      if (ssp.getState(rep)== State.ACTIVE) {
         numActiveReplicas++;
       }
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
index 8df6175..90bc857 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
@@ -16,22 +16,25 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
 import org.apache.http.NoHttpResponseException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.util.RTimer;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-
 public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -75,6 +78,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
     String testCollectionName = "c8n_2x2_commits";
     createCollection(testCollectionName, "conf1", 2, 2, 1);
     cloudClient.setDefaultCollection(testCollectionName);
+    DocCollection coll = cloudClient.getClusterStateProvider().getCollection(testCollectionName);
 
     List<Replica> notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
@@ -85,6 +89,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
 
     log.info("All replicas active for "+testCollectionName);
 
+    ShardStateProvider shardStateProvider = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
     // let's put the leader in its own partition, no replicas can contact it now
     Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
     log.info("Creating partition to leader at "+leader.getCoreUrl());
@@ -98,8 +103,9 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
     Thread.sleep(sleepMsBeforeHealPartition);
 
     cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
-    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+    leader = shardStateProvider.getLeader(coll.getSlice("shard1"), ZkStateReader.GET_LEADER_RETRY_DEFAULT_TIMEOUT) ;
+    assertSame("Leader was not active", Replica.State.ACTIVE, shardStateProvider.getState(leader));
 
     log.info("Healing partitioned replica at "+leader.getCoreUrl());
     leaderProxy.reopen();
@@ -118,6 +124,9 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
     String testCollectionName = "c8n_1x3_commits";
     createCollection(testCollectionName, "conf1", 1, 3, 1);
     cloudClient.setDefaultCollection(testCollectionName);
+    DocCollection coll = cloudClient.getZkStateReader().getCollection(testCollectionName);
+
+    ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
 
     List<Replica> notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
@@ -129,7 +138,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
     log.info("All replicas active for "+testCollectionName);
 
     // let's put the leader in its own partition, no replicas can contact it now
-    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    Replica leader = ssp.getLeader(coll.getSlice("shard1"));
     log.info("Creating partition to leader at "+leader.getCoreUrl());
 
     SocketProxy leaderProxy = getProxyForReplica(leader);
@@ -141,7 +150,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
 
     cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
     leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+    assertSame("Leader was not active", Replica.State.ACTIVE, ssp.getState(leader));
 
     log.info("Healing partitioned replica at "+leader.getCoreUrl());
     leaderProxy.reopen();
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 7a11262..3807c20 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.solr.cloud;
 
-import static org.apache.solr.common.cloud.Replica.State.DOWN;
-import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
-
 import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -32,11 +29,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -65,6 +64,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
+import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
+
 /**
  * Simulates HTTP partitions between a leader and replica but the replica does
  * not lose its ZooKeeper connection.
@@ -320,7 +322,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
       Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
       Slice slice = slices.iterator().next();
       Replica partitionedReplica = slice.getReplica(replicaName);
-      replicaState = partitionedReplica.getState();
+      replicaState = cloudClient.getZkStateReader().getShardStateProvider(collection).getState(partitionedReplica);
       if (replicaState == state) return;
     }
     assertEquals("Timeout waiting for state "+ state +" of replica " + replicaName + ", current state " + replicaState,
@@ -469,10 +471,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     ZkStateReader zkr = cloudClient.getZkStateReader();
     ClusterState cs = zkr.getClusterState();
     assertNotNull(cs);
+    ShardStateProvider ssp = zkr.getShardStateProvider(testCollectionName);
     for (Slice shard : cs.getCollection(testCollectionName).getActiveSlices()) {
       if (shard.getName().equals(shardId)) {
         for (Replica replica : shard.getReplicas()) {
-          final Replica.State state = replica.getState();
+          final Replica.State state =  ssp.getState(replica);
           if (state == Replica.State.ACTIVE || state == Replica.State.RECOVERING) {
             activeReplicas.put(replica.getName(), replica);
           }
@@ -593,6 +596,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     boolean allReplicasUp = false;
     long waitMs = 0L;
     long maxWaitMs = maxWaitSecs * 1000L;
+    ShardStateProvider ssp = zkr.getShardStateProvider(testCollectionName);
     while (waitMs < maxWaitMs && !allReplicasUp) {
       cs = cloudClient.getZkStateReader().getClusterState();
       assertNotNull(cs);
@@ -607,7 +611,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
         if (!replicasToCheck.contains(replica.getName()))
           continue;
 
-        final Replica.State state = replica.getState();
+        final Replica.State state = ssp.getState(replica);
         if (state != Replica.State.ACTIVE) {
           log.info("Replica " + replica.getName() + " is currently " + state);
           allReplicasUp = false;
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
index 353c0c9..d4c78ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
@@ -32,6 +32,7 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
 import org.apache.solr.common.SolrInputDocument;
@@ -207,6 +208,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
       ZkStateReader zkStateReader = cloudClient.getZkStateReader();
       ClusterState clusterState = zkStateReader.getClusterState();
       DocCollection collection1 = clusterState.getCollection("collection1");
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
       Slice slice = collection1.getSlice("shard1");
       Collection<Replica> replicas = slice.getReplicas();
       boolean allActive = true;
@@ -221,7 +223,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
           .collect(Collectors.toList());
 
       for (Replica replica : replicasToCheck) {
-        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica) != Replica.State.ACTIVE) {
           allActive = false;
           break;
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
index 69c14cb..8004d7b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
@@ -97,7 +98,7 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
       ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), collection, 120000);
       Slice shard = getCollectionState(collection).getSlice("shard1");
       assertNotSame(shard.getLeader().getNodeName(), oldLeader.getNodeName());
-      assertEquals(getNonLeader(shard).getNodeName(), oldLeader.getNodeName());
+      assertEquals(getNonLeader(shard, cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection) ).getNodeName(), oldLeader.getNodeName());
 
       for (String id : addedIds) {
         assertNotNull(cluster.getSolrClient().getById(collection,id));
@@ -157,9 +158,9 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
     return oldLeader;
   }
 
-  private Replica getNonLeader(Slice slice) {
+  private Replica getNonLeader(Slice slice, ShardStateProvider ssp) {
     if (slice.getReplicas().size() <= 1) return null;
-    return slice.getReplicas(rep -> !rep.getName().equals(slice.getLeader().getName())).get(0);
+    return slice.getReplicas(rep -> !rep.getName().equals(ssp.getLeader(slice).getName())).get(0);
   }
 
   @Test
@@ -177,11 +178,11 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
       JettySolrRunner otherReplicaJetty = null;
       if (numReplicas == 2) {
         Slice shard = getCollectionState(collection).getSlice("shard1");
-        otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard));
+        otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard, cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection)));
         log.info("Stop jetty node : {} state:{}", otherReplicaJetty.getBaseUrl(), getCollectionState(collection));
         otherReplicaJetty.stop();
         cluster.waitForJettyToStop(otherReplicaJetty);
-        waitForState("Timeout waiting for replica get down", collection, (liveNodes, collectionState, ssp) -> getNonLeader(collectionState.getSlice("shard1")).getState() != Replica.State.ACTIVE);
+        waitForState("Timeout waiting for replica get down", collection, (liveNodes, collectionState, ssp) -> ssp.getState(getNonLeader(collectionState.getSlice("shard1"), ssp)) != Replica.State.ACTIVE);
       }
 
       Replica oldLeader = corruptLeader(collection, new ArrayList<>());
diff --git a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
index a91d5f7..37f93d0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Properties;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.cloud.ClusterProperties;
@@ -124,10 +125,12 @@ public class LegacyCloudClusterPropTest extends SolrCloudTestCase {
   }
 
   private void checkCollectionActive(String coll) {
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
     DocCollection docColl = getCollectionState(coll);
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(coll);
     for (Replica rep : docColl.getReplicas()) {
-      if (rep.getState() == Replica.State.ACTIVE) return;
+      if (ssp.getState(rep) == Replica.State.ACTIVE) return;
     }
     fail("Replica was not active for collection " + coll);
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index a446f29..4412cb5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.solr.SolrTestCaseJ4Test;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.cloud.overseer.NodeMutator;
 import org.apache.solr.cloud.overseer.ZkWriteCommand;
 import org.apache.solr.common.cloud.ClusterState;
@@ -48,6 +49,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
     //Collection2: 1 shard X 1 replica = replica1 on node2
     ZkStateReader reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2", 1, 1, NODE1, NODE2);
     ClusterState clusterState = reader.getClusterState();
+    ShardStateProvider ssp = reader.getShardStateProvider("collection1");
     assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
     assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
     assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
@@ -56,8 +58,8 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
     List<ZkWriteCommand> writes = nm.downNode(clusterState, props);
     assertEquals(writes.size(), 1);
     assertEquals(writes.get(0).name, "collection1");
-    assertEquals(writes.get(0).collection.getReplica("replica1").getState(), Replica.State.DOWN);
-    assertEquals(writes.get(0).collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
+    assertEquals(ssp.getState(writes.get(0).collection.getReplica("replica1")), Replica.State.DOWN);
+    assertEquals(ssp.getState(writes.get(0).collection.getReplica("replica2")), Replica.State.ACTIVE);
     reader.close();
 
     //We use 3 nodes with maxShardsPerNode as 1
@@ -68,6 +70,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
     clusterState = reader.getClusterState();
     assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
     assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
+    ssp = reader.getShardStateProvider("collection2");
 
     assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
 
@@ -79,12 +82,12 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
     assertEquals(writes.size(), 2);
     for (ZkWriteCommand write : writes) {
       if (write.name.equals("collection1")) {
-        assertEquals(write.collection.getReplica("replica1").getState(), Replica.State.DOWN);
-        assertEquals(write.collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
+        assertEquals(ssp.getState(write.collection.getReplica("replica1")), Replica.State.DOWN);
+        assertEquals(ssp.getState(write.collection.getReplica("replica2")), Replica.State.ACTIVE);
       } else if (write.name.equals("collection3")) {
-        assertEquals(write.collection.getReplica("replica5").getState(), Replica.State.DOWN);
-        assertEquals(write.collection.getReplica("replica6").getState(), Replica.State.ACTIVE);
-        assertEquals(write.collection.getReplica("replica7").getState(), Replica.State.ACTIVE);
+        assertEquals(ssp.getState(write.collection.getReplica("replica5")), Replica.State.DOWN);
+        assertEquals(ssp.getState(write.collection.getReplica("replica6")), Replica.State.ACTIVE);
+        assertEquals(ssp.getState(write.collection.getReplica("replica7")), Replica.State.ACTIVE);
       } else {
         fail("No other collection needs to be changed");
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
index 5169d22..eea5cdf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
 import org.apache.solr.common.SolrInputDocument;
@@ -339,6 +340,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
       ZkStateReader zkStateReader = cloudClient.getZkStateReader();
       ClusterState clusterState = zkStateReader.getClusterState();
       DocCollection collection1 = clusterState.getCollection("collection1");
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider("collection1");
       Slice slice = collection1.getSlice("shard1");
       Collection<Replica> replicas = slice.getReplicas();
       boolean allActive = true;
@@ -354,7 +356,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
               .collect(Collectors.toList());
 
       for (Replica replica : replicasToCheck) {
-        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica) != Replica.State.ACTIVE) {
           allActive = false;
           break;
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
index 5693330..a8ec1c9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -102,7 +103,8 @@ public class RecoveryZkTest extends SolrCloudTestCase {
      
     // bring shard replica down
     DocCollection state = getCollectionState(collection);
-    Replica leader = state.getLeader("shard1");
+    ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
+    Replica leader = ssp.getLeader(state.getSlice("shard1"));
     Replica replica = getRandomReplica(state.getSlice("shard1"), (r) -> leader != r);
 
     JettySolrRunner jetty = cluster.getReplicaJetty(replica);
@@ -131,12 +133,13 @@ public class RecoveryZkTest extends SolrCloudTestCase {
 
     // test that leader and replica have same doc count
     state = getCollectionState(collection);
-    assertShardConsistency(state.getSlice("shard1"), true);
+
+    assertShardConsistency(ssp, state.getSlice("shard1"), true);
 
   }
 
-  private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
-    List<Replica> replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE);
+  private void assertShardConsistency(ShardStateProvider ssp, Slice shard, boolean expectDocs) throws Exception {
+    List<Replica> replicas = shard.getReplicas(r -> ssp.getState(r)== Replica.State.ACTIVE);
     long[] numCounts = new long[replicas.size()];
     int i = 0;
     for (Replica replica : replicas) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 0412330..f46d608 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -89,6 +90,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     cluster.waitForActiveCollection(coll, 5, 5 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
     
     DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
+    ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(coll);
     log.debug("### Before decommission: " + collection);
     log.info("excluded_node : {}  ", emptyNode);
     createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient);
@@ -159,13 +161,13 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     });
     assertFalse(newReplicas.isEmpty());
     for (Replica r : newReplicas) {
-      assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
+      assertEquals(r.toString(), Replica.State.ACTIVE, ssp.getState(r));
     }
     // make sure all replicas on emptyNode are not active
     replicas = collection.getReplicas(emptyNode);
     if (replicas != null) {
       for (Replica r : replicas) {
-        assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
+        assertFalse(r.toString(), Replica.State.ACTIVE.equals(ssp.getState(r)));
       }
     }
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
index 735cc20..9090392 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
@@ -32,13 +32,15 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.carrotsearch.randomizedtesting.annotations.Nightly;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
-import com.carrotsearch.randomizedtesting.annotations.Nightly;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@@ -58,16 +60,14 @@ import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -398,12 +398,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
     while (!timeOut.hasTimedOut()) {
       ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
       Collection<Slice> slices = clusterState.getCollection(collection).getActiveSlices();
+      ShardStateProvider ssp = cloudClient.getClusterStateProvider().getShardStateProvider(collection);
       if (slices.size() == numSlices) {
         boolean isMatch = true;
         for (Slice slice : slices) {
           int count = 0;
           for (Replica replica : slice.getReplicas()) {
-            if (replica.getState() == Replica.State.ACTIVE && clusterState.liveNodesContain(replica.getNodeName())) {
+            if (ssp.isActive(replica)) {
               count++;
             }
           }
diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
index c48f22e..7f78516 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
@@ -16,10 +16,18 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -33,13 +41,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 /**
  * Test sync phase that occurs when Leader goes down and a new Leader is
  * elected.
@@ -217,11 +218,13 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
       ZkStateReader zkStateReader = cloudClient.getZkStateReader();
       ClusterState clusterState = zkStateReader.getClusterState();
       DocCollection collection1 = clusterState.getCollection("collection1");
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
+
       Slice slice = collection1.getSlice("shard1");
       Collection<Replica> replicas = slice.getReplicas();
       boolean allActive = true;
       for (Replica replica : replicas) {
-        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica)!= Replica.State.ACTIVE) {
           allActive = false;
           break;
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
index 2f68ad9..56a071d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -155,16 +156,16 @@ public class TestCloudConsistency extends SolrCloudTestCase {
     j2.stop();
     cluster.waitForJettyToStop(j1);
     cluster.waitForJettyToStop(j2);
-    
+
     waitForState("", collection, (liveNodes, collectionState, ssp) ->
       collectionState.getSlice("shard1").getReplicas().stream()
-          .filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
+          .filter(replica -> ssp.getState(replica) == Replica.State.DOWN).count() == 2);
 
     addDocs(collection, 1, docId);
     JettySolrRunner j3 = cluster.getJettySolrRunner(0);
     j3.stop();
     cluster.waitForJettyToStop(j3);
-    waitForState("", collection, (liveNodes, collectionState, ssp) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+    waitForState("", collection, (liveNodes, collectionState, ssp) -> ssp.getState(collectionState.getReplica(leader.getName())) == Replica.State.DOWN);
 
     cluster.getJettySolrRunner(1).start();
     cluster.getJettySolrRunner(2).start();
@@ -173,9 +174,11 @@ public class TestCloudConsistency extends SolrCloudTestCase {
     cluster.waitForNode(j2, 30);
     
     TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
+
     while (!timeOut.hasTimedOut()) {
-      Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader();
-      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+      Replica newLeader = shardStateProvider.getLeader(getCollectionState(collection).getSlice("shard1"));
+      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && shardStateProvider.getState(newLeader) == Replica.State.ACTIVE) {
         fail("Out of sync replica became leader " + newLeader);
       }
     }
@@ -212,12 +215,13 @@ public class TestCloudConsistency extends SolrCloudTestCase {
       proxies.get(cluster.getJettySolrRunner(i)).reopen();
     }
     waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState, ssp)
-        -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+        -> ssp.getState(collectionState.getReplica(leader.getName())) == Replica.State.DOWN);
 
+    ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
     TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     while (!timeOut.hasTimedOut()) {
-      Replica newLeader = getCollectionState(collection).getLeader("shard1");
-      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+      Replica newLeader = shardStateProvider.getLeader( getCollectionState(collection).getSlice("shard1"));
+      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && shardStateProvider.getState(newLeader) == Replica.State.ACTIVE) {
         fail("Out of sync replica became leader " + newLeader);
       }
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
index 5e5a11e..b25ccb3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
@@ -197,7 +197,7 @@ public class TestCloudSearcherWarming extends SolrCloudTestCase {
     CollectionStatePredicate collectionStatePredicate = (liveNodes, collectionState, ssp) -> {
       for (Replica r : collectionState.getReplicas()) {
         if (r.getNodeName().equals(oldNodeName.get())) {
-          return r.getState() == Replica.State.DOWN;
+          return ssp.getState(r) == Replica.State.DOWN;
         }
       }
       return false;
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index e8abffe..e9fbf66 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -333,9 +333,9 @@ public class TestPullReplica extends SolrCloudTestCase {
       if (r == null) {
         return false;
       }
-      statesSeen.add(r.getState());
-      log.info("CollectionStateWatcher saw state: {}", r.getState());
-      return r.getState() == Replica.State.ACTIVE;
+      statesSeen.add(ssp.getState(r));
+      log.info("CollectionStateWatcher saw state: {}", ssp.getState(r));
+      return ssp.getState(r) == Replica.State.ACTIVE;
     });
     CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.PULL).process(cluster.getSolrClient());
     waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
@@ -509,10 +509,10 @@ public class TestPullReplica extends SolrCloudTestCase {
     DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
     assertEquals(1, docCollection.getSlices().size());
 
-    waitForNumDocsInAllActiveReplicas(0);
+    waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 0);
     cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
     cluster.getSolrClient().commit(collectionName);
-    waitForNumDocsInAllActiveReplicas(1);
+    waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 1);
 
     JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
     pullReplicaJetty.stop();
@@ -522,20 +522,20 @@ public class TestPullReplica extends SolrCloudTestCase {
 
     cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
     cluster.getSolrClient().commit(collectionName);
-    waitForNumDocsInAllActiveReplicas(2);
+    waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 2);
 
     pullReplicaJetty.start();
     waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
-    waitForNumDocsInAllActiveReplicas(2);
+    waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 2);
   }
 
   public void testSearchWhileReplicationHappens() {
 
   }
 
-  private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
+  private void waitForNumDocsInAllActiveReplicas(ShardStateProvider ssp, int numDocs) throws IOException, SolrServerException, InterruptedException {
     DocCollection docCollection = getCollectionState(collectionName);
-    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()));
+    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> ssp.getState(r) == Replica.State.ACTIVE).collect(Collectors.toList()));
   }
 
   private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas) throws IOException, SolrServerException, InterruptedException {
@@ -584,14 +584,15 @@ public class TestPullReplica extends SolrCloudTestCase {
     if (updateCollection) {
       cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
     }
+    ShardStateProvider ssp = cluster.getSolrClient().getZkStateReader().getShardStateProvider(collectionName);
     DocCollection docCollection = getCollectionState(collectionName);
     assertNotNull(docCollection);
     assertEquals("Unexpected number of writer replicas: " + docCollection, numNrtReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r)== Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of active replicas: " + docCollection, numTlogReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     return docCollection;
   }
 
@@ -601,13 +602,13 @@ public class TestPullReplica extends SolrCloudTestCase {
   private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
     return (liveNodes, collectionState, ssp) -> {
       for (Replica r:collectionState.getReplicas()) {
-        if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+        if (ssp.getState(r) != Replica.State.DOWN && ssp.getState(r) != Replica.State.ACTIVE) {
           return false;
         }
-        if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+        if (ssp.getState(r) == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
           return false;
         }
-        if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+        if (ssp.getState(r) == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
           return false;
         }
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
index eb4e26a..ab49c2c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -32,6 +32,7 @@ import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -273,13 +274,14 @@ public void testCantConnectToPullReplica() throws Exception {
       cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
     }
     DocCollection docCollection = getCollectionState(collectionName);
+    ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
     assertNotNull(docCollection);
     assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter, 
-        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r)  == Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of pull replicas: " + docCollection, numPassive, 
-        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of active replicas: " + docCollection, numActive, 
-        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     return docCollection;
   }
   
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java b/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
index 1cd70f4..a6e6edd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
@@ -74,9 +75,10 @@ public class TestQueryingOnDownCollection extends SolrCloudTestCase {
     downAllReplicas();
 
     // assert all replicas are in down state
+    ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(COLLECTION_NAME);
     List<Replica> replicas = getCollectionState(COLLECTION_NAME).getReplicas();
     for (Replica replica: replicas){
-      assertEquals(replica.getState(), Replica.State.DOWN);
+      assertEquals(ssp.getState(replica), Replica.State.DOWN);
     }
 
     // assert all nodes as active
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
index 5e59b7c..3c845aa 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
@@ -766,7 +766,8 @@ public class TestTlogReplica extends SolrCloudTestCase {
 
   private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException {
     DocCollection docCollection = getCollectionState(collectionName);
-    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
+    ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
+    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> ssp.getState(r) == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
   }
 
   private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, int timeout) throws IOException, SolrServerException, InterruptedException {
@@ -815,17 +816,19 @@ public class TestTlogReplica extends SolrCloudTestCase {
   }
 
   private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+
     if (updateCollection) {
       cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
     }
+    ShardStateProvider ssp = cluster.getSolrClient().getZkStateReader().getShardStateProvider(collectionName);
     DocCollection docCollection = getCollectionState(collectionName);
     assertNotNull(docCollection);
     assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas,
-        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
     return docCollection;
   }
 
@@ -835,13 +838,14 @@ public class TestTlogReplica extends SolrCloudTestCase {
   private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
     return (liveNodes, collectionState, ssp) -> {
       for (Replica r:collectionState.getReplicas()) {
-        if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+        if (ssp.getState(r) != Replica.State.DOWN && ssp.getState(r) != Replica.State.ACTIVE) {
           return false;
         }
-        if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+        if (ssp.getState(r) == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
           return false;
         }
-        if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+        if (ssp.getState(r)  == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+
           return false;
         }
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
index 041ccec..02e445f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
@@ -23,6 +23,7 @@ import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.RetryUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -53,8 +54,12 @@ public class CollectionReloadTest extends SolrCloudTestCase {
     CollectionAdminRequest.createCollection(testCollectionName, "conf", 1, 1)
         .process(cluster.getSolrClient());
 
+
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
     Replica leader
-        = cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", DEFAULT_TIMEOUT);
+        = zkStateReader.getShardStateProvider(testCollectionName)
+        .getLeader(zkStateReader.getCollection(testCollectionName)
+            .getSlice("shard1"), DEFAULT_TIMEOUT);
 
     long coreStartTime = getCoreStatus(leader).getCoreStartTime().getTime();
     CollectionAdminRequest.reloadCollection(testCollectionName).process(cluster.getSolrClient());
@@ -77,7 +82,7 @@ public class CollectionReloadTest extends SolrCloudTestCase {
     waitForState("Timed out waiting for core to re-register as ACTIVE after session expiry", testCollectionName, (n, c, ssp) -> {
       log.info("Collection state: {}", c.toString());
       Replica expiredReplica = c.getReplica(leader.getName());
-      return expiredReplica.getState() == Replica.State.ACTIVE && c.getZNodeVersion() > initialStateVersion;
+      return ssp.getState(expiredReplica) == Replica.State.ACTIVE && c.getZNodeVersion() > initialStateVersion;
     });
 
     log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!");
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
index 04c39ec..1d8860c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
@@ -110,7 +110,7 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
     // wait for recoveries to finish, for a clean shutdown - see SOLR-9645
     waitForState("Expected to see all replicas active", collectionName, (n, c, ssp) -> {
       for (Replica r : c.getReplicas()) {
-        if (r.getState() != Replica.State.ACTIVE)
+        if (ssp.getState(r) != Replica.State.ACTIVE)
           return false;
       }
       return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
index 6e8c873..b295a1f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
@@ -107,7 +107,7 @@ public class CustomCollectionTest extends SolrCloudTestCase {
       if (c.getSlice("x") == null)
         return false;
       for (Replica r : c.getSlice("x")) {
-        if (r.getState() != Replica.State.ACTIVE)
+        if (ssp.getState(r) != Replica.State.ACTIVE)
           return false;
       }
       return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 95273ea..d049af6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -227,7 +227,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
               }
               Slice slice = collectionState.getSlice(SHARD1_0);
               if (slice.getReplicas().size() == 2)  {
-                if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+                if (slice.getReplicas().stream().noneMatch(r -> ssp.getState(r)  == Replica.State.RECOVERING)) {
                   // we see replicas and none of them are recovering
                   newReplicaLatch.countDown();
                   return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
index c6bcf54..333fc55 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
@@ -79,7 +79,7 @@ public class HDFSCollectionsAPITest extends SolrCloudTestCase {
     jettySolrRunner.stop();
     waitForState("", collection, (liveNodes, collectionState, ssp) -> {
       Replica replica = collectionState.getSlice("shard1").getReplicas().iterator().next();
-      return replica.getState() == Replica.State.DOWN;
+      return ssp.getState(replica) == Replica.State.DOWN;
     });
     CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
index f0bae3b..42bc64b 100644
--- a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
@@ -16,8 +16,6 @@
  */
 package org.apache.solr.core.snapshots;
 
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,10 +25,11 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.apache.lucene.util.TestUtil;
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.ListSnapshots;
@@ -53,6 +52,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
 @SolrTestCaseJ4.SuppressSSL // Currently unknown why SSL does not work with this test
 @Slow
 public class TestSolrCloudSnapshots extends SolrCloudTestCase {
@@ -112,9 +113,10 @@ public class TestSolrCloudSnapshots extends SolrCloudTestCase {
 
       // Figure out if at-least one replica is "down".
       DocCollection collState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+      ShardStateProvider ssp = solrClient.getZkStateReader().getShardStateProvider(collectionName);
       for (Slice s : collState.getSlices()) {
         for (Replica replica : s.getReplicas()) {
-          if (replica.getState() == State.DOWN) {
+          if (ssp.getState(replica) == State.DOWN) {
             stoppedCoreName = Optional.of(replica.getCoreName());
           }
         }
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index 88ef94d..dcfb144 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -37,6 +37,7 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -994,10 +995,11 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
       Thread.sleep(10);
       cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
       ClusterState state = cloudClient.getZkStateReader().getClusterState();
+      ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
 
       int numActiveReplicas = 0;
       for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
-        if (rep.getState().equals(Replica.State.ACTIVE))
+        if (ssp.getState(rep).equals(Replica.State.ACTIVE))
           numActiveReplicas++;
 
       assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
@@ -1066,11 +1068,12 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
           Thread.sleep(10);
           cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
           ClusterState state = cloudClient.getZkStateReader().getClusterState();
+          ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
 
           int numActiveReplicas = 0;
           for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
             assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
-            if (rep.getState().equals(Replica.State.ACTIVE))
+            if (ssp.getState(rep).equals(Replica.State.ACTIVE))
               numActiveReplicas++;
           }
           assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
@@ -1320,10 +1323,11 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
       Thread.sleep(10);
       cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
       ClusterState state = cloudClient.getZkStateReader().getClusterState();
+      ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
 
       int numActiveReplicas = 0;
       for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
-        if (rep.getState().equals(Replica.State.ACTIVE))
+        if (ssp.getState(rep).equals(Replica.State.ACTIVE))
           numActiveReplicas++;
 
       assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
index 16b610f..b2f8f18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
@@ -54,4 +54,9 @@ public class DirectShardState implements ShardStateProvider {
   public boolean isActive(Slice slice) {
     return slice.getState() == Slice.State.ACTIVE;
   }
+
+  @Override
+  public Replica getLeader(Slice slice, int timeout) throws InterruptedException {
+    throw new RuntimeException("Not implemented");//TODO
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
index 74d9d74..3a5ab8e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
@@ -29,6 +29,10 @@ public interface ShardStateProvider {
 
   Replica getLeader(Slice slice);
 
+  /**Gete the leader of the slice. Wait for one if there is no leader
+   */
+  Replica getLeader(Slice slice, int timeout) throws InterruptedException;
+
   boolean isActive(Replica replica);
 
   boolean isActive(Slice slice);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 955295a..954b9c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -37,6 +37,7 @@ import java.util.function.BiPredicate;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
@@ -295,10 +296,11 @@ public class PolicyHelper {
 
   private static void addMissingReplicas(SolrCloudManager cloudManager, Suggestion.Ctx ctx) throws IOException {
     cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> coll.forEach(slice -> {
+      ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(coll.getName());
       if (!ctx.needMore()) return;
           ReplicaCount replicaCount = new ReplicaCount();
           slice.forEach(replica -> {
-            if (replica.getState() == Replica.State.ACTIVE || replica.getState() == Replica.State.RECOVERING) {
+            if (ssp.getState(replica) == Replica.State.ACTIVE || ssp.getState(replica) == Replica.State.RECOVERING) {
               replicaCount.increment(replica.getType());
             }
           });
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 2f6a48a..4705554 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -411,7 +411,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
   public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
       throws InterruptedException, TimeoutException {
     getClusterStateProvider().connect();
-    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, (d, ssp) -> predicate.test(d));
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 22b807f..1fdd5eb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -28,12 +28,12 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -260,16 +260,16 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
       ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
 
       Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
 
       ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> liveNodes = clusterState.getLiveNodes();
 
       List<String> baseUrls = new ArrayList<>();
       for(Slice slice : slices) {
         Collection<Replica> replicas = slice.getReplicas();
         List<Replica> shuffler = new ArrayList<>();
         for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+          if(ssp.isActive(replica)) {
             shuffler.add(replica);
           }
         }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index a493f5a..9b84137 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.ClassificationEvaluation;
@@ -346,12 +347,14 @@ public class TextLogitStream extends TupleStream implements Expressible {
       ClusterState clusterState = zkStateReader.getClusterState();
       Set<String> liveNodes = clusterState.getLiveNodes();
 
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
+
       List<String> baseUrls = new ArrayList<>();
       for(Slice slice : slices) {
         Collection<Replica> replicas = slice.getReplicas();
         List<Replica> shuffler = new ArrayList<>();
         for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+          if(ssp.isActive(replica)) {
             shuffler.add(replica);
           }
         }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index b42ab77..20297e8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -25,15 +25,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -50,7 +50,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -397,16 +396,13 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
 
     Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
 
-    ClusterState clusterState = zkStateReader.getClusterState();
-    Set<String> liveNodes = clusterState.getLiveNodes();
-
     for(Slice slice : slices) {
       String sliceName = slice.getName();
       long checkpoint;
       if(initialCheckpoint > -1) {
         checkpoint = initialCheckpoint;
       } else {
-        checkpoint = getCheckpoint(slice, liveNodes);
+        checkpoint = getCheckpoint(zkStateReader.getShardStateProvider(this.collection), slice);
       }
 
       this.checkpoints.put(sliceName, checkpoint);
@@ -414,7 +410,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
   }
 
   //Gets the highest version number for the slice.
-  private long getCheckpoint(Slice slice, Set<String> liveNodes) throws IOException {
+  private long getCheckpoint(ShardStateProvider ssp,  Slice slice) throws IOException {
     Collection<Replica> replicas = slice.getReplicas();
     long checkpoint = -1;
     ModifiableSolrParams params = new ModifiableSolrParams();
@@ -423,7 +419,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     params.set(DISTRIB, "false");
     params.set("rows", 1);
     for(Replica replica : replicas) {
-      if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+      if(ssp.isActive(replica)) {
         String coreUrl = replica.getCoreUrl();
         SolrStream solrStream = new SolrStream(coreUrl, params);
 
@@ -476,14 +472,12 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
     ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
     Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
 
-    ClusterState clusterState = zkStateReader.getClusterState();
-    Set<String> liveNodes = clusterState.getLiveNodes();
-
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(checkpointCollection);
     OUTER:
     for(Slice slice : slices) {
       Collection<Replica> replicas = slice.getReplicas();
       for(Replica replica : replicas) {
-        if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
+        if(ssp.isActive(replica)){
           HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
           try {
             SolrDocument doc = httpClient.getById(id);
@@ -518,10 +512,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
       mParams.set("fl", fl);
 
       Random random = new Random();
-
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> liveNodes = clusterState.getLiveNodes();
-
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
       for(Slice slice : slices) {
         ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
         long checkpoint = checkpoints.get(slice.getName());
@@ -529,7 +520,7 @@ public class TopicStream extends CloudSolrStream implements Expressible  {
         Collection<Replica> replicas = slice.getReplicas();
         List<Replica> shuffler = new ArrayList<>();
         for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+          if(ssp.isActive(replica))
             shuffler.add(replica);
         }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 086bddd..a5c6c49 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -24,11 +24,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.Map;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -132,6 +133,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
       //SolrCloud Sharding
       CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
       ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+      ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
       ClusterState clusterState = zkStateReader.getClusterState();
       Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
       Set<String> liveNodes = clusterState.getLiveNodes();
@@ -139,7 +141,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
         Collection<Replica> replicas = slice.getReplicas();
         List<Replica> shuffler = new ArrayList<>();
         for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+          if(ssp.isActive(replica))
             shuffler.add(replica);
         }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
index 5e61bc1..a6ee609 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
@@ -17,12 +17,12 @@
 package org.apache.solr.common.cloud;
 
 import java.lang.invoke.MethodHandles;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.slf4j.Logger;
@@ -61,6 +61,7 @@ public class ClusterStateUtil {
     long timeout = System.nanoTime()
         + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
     boolean success = false;
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
     while (!success && System.nanoTime() < timeout) {
       success = true;
       ClusterState clusterState = zkStateReader.getClusterState();
@@ -81,7 +82,7 @@ public class ClusterStateUtil {
               for (Replica replica : replicas) {
                 // on a live node?
                 final boolean live = clusterState.liveNodesContain(replica.getNodeName());
-                final boolean isActive = replica.getState() == Replica.State.ACTIVE;
+                final boolean isActive = ssp.getState(replica) == Replica.State.ACTIVE;
                 if (!live || !isActive) {
                   // fail
                   success = false;
@@ -219,11 +220,12 @@ public class ClusterStateUtil {
   public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
     Slice[] slices;
     slices = zkStateReader.getClusterState().getCollection(collection).getActiveSlicesArr();
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
     int liveAndActive = 0;
     for (Slice slice : slices) {
       for (Replica replica : slice.getReplicas()) {
         boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
-        boolean active = replica.getState() == Replica.State.ACTIVE;
+        boolean active = ssp.getState(replica)== Replica.State.ACTIVE;
         if (live && active) {
           liveAndActive++;
         }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
index d3ad502..06211da 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
@@ -19,7 +19,7 @@ package org.apache.solr.common.cloud;
 
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
+import java.util.function.BiPredicate;
 
 import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 
@@ -27,7 +27,7 @@ import org.apache.solr.client.solrj.cloud.ShardStateProvider;
  * Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
  *
  * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
- * @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
+ * @see ZkStateReader#waitForState(String, long, TimeUnit, BiPredicate)
  */
 public interface CollectionStatePredicate {
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 21d59fe..f70a815 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -168,6 +168,7 @@ public class Replica extends ZkNodeProps {
   }
   
   /** Returns the {@link State} of this replica. */
+  @Deprecated
   public State getState() {
     return state;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 352a716..4e95aa6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -158,8 +159,7 @@ public class ZkStateReader implements SolrCloseable {
   protected volatile ClusterState clusterState;
 
   private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
-  private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));
-  ;
+  public static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));
 
   public static final String LEADER_ELECT_ZKNODE = "leader_elect";
 
@@ -219,7 +219,12 @@ public class ZkStateReader implements SolrCloseable {
 
   private Set<ClusterPropertiesListener> clusterPropertiesListeners = ConcurrentHashMap.newKeySet();
 
-  private final ShardStateProvider directReplicaState = new DirectShardState(s -> liveNodes.contains(s));
+  private final ShardStateProvider directReplicaState = new DirectShardState(s -> liveNodes.contains(s)) {
+    @Override
+    public Replica getLeader(Slice slice, int timeout) throws InterruptedException {
+      return getLeaderRetry(slice.collection, slice.getName(), timeout);
+    }
+  };
 
   /**
    * Used to submit notifications to Collection Properties watchers in order
@@ -1736,7 +1741,7 @@ public class ZkStateReader implements SolrCloseable {
    * @param predicate  the predicate to call on state changes
    * @throws InterruptedException on interrupt
    * @throws TimeoutException     on timeout
-   * @see #waitForState(String, long, TimeUnit, Predicate)
+   * @see #waitForState(String, long, TimeUnit, BiPredicate)
    * @see #registerCollectionStateWatcher
    */
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
@@ -1785,7 +1790,7 @@ public class ZkStateReader implements SolrCloseable {
    * @throws InterruptedException on interrupt
    * @throws TimeoutException     on timeout
    */
-  public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+  public void waitForState(final String collection, long wait, TimeUnit unit, BiPredicate<DocCollection, ShardStateProvider> predicate)
       throws InterruptedException, TimeoutException {
 
     if (closed) {
@@ -1797,7 +1802,7 @@ public class ZkStateReader implements SolrCloseable {
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     DocCollectionWatcher watcher = (c, ssp) -> {
       docCollection.set(c);
-      boolean matches = predicate.test(c);
+      boolean matches = predicate.test(c, getShardStateProvider(collection));
       if (matches)
         latch.countDown();
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index eec0d99..f7f3709 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -205,7 +206,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       throws Exception {
     log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
 
-    zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
+    zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection, ssp) -> {
       if (docCollection == null)
         return true;
       return false;
@@ -219,12 +220,15 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
     ZkStateReader zkStateReader = cloudClient.getZkStateReader();
     zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
 
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(DEFAULT_COLLECTION);
     for (; ; ) {
       ClusterState clusterState = zkStateReader.getClusterState();
       DocCollection coll = clusterState.getCollection("collection1");
       Slice slice = coll.getSlice(shardName);
-      if (slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getLeader().getState() == Replica.State.ACTIVE) {
-        log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, slice.getLeader(),timeOut.timeElapsed(MILLISECONDS) );
+
+      Replica leader = ssp.getLeader(slice);
+      if (leader != null && !leader.equals(oldLeader) && ssp.getState(leader) == Replica.State.ACTIVE) {
+        log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, leader,timeOut.timeElapsed(MILLISECONDS) );
         break;
       }
 
@@ -237,12 +241,13 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       Thread.sleep(100);
     }
 
-    zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
+    zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection, sp) -> {
       if (docCollection == null)
         return false;
 
       Slice slice = docCollection.getSlice(shardName);
-      if (slice != null && slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getLeader().getState() == Replica.State.ACTIVE) {
+      Replica leader = sp.getLeader(slice);
+      if (slice != null && leader!= null && !leader.equals(oldLeader) && sp.getState(leader) == Replica.State.ACTIVE) {
         log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, slice.getLeader(), timeOut.timeElapsed(MILLISECONDS) );
         return true;
       }
@@ -254,9 +259,9 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       Replica.State expectedState) throws InterruptedException, TimeoutException {
     log.info("verifyReplicaStatus ({}) shard={} coreNodeName={}", collection, shard, coreNodeName);
     reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
-        (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
+        (collectionState,ssp) -> collectionState != null && collectionState.getSlice(shard) != null
             && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
-            && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
+            &&  ssp.getState(  collectionState.getSlice(shard).getReplicasMap().get(coreNodeName)) == expectedState);
   }
 
   protected static void assertAllActive(String collection, ZkStateReader zkStateReader)
@@ -268,8 +273,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       if (docCollection == null || docCollection.getSlices() == null) {
         throw new IllegalArgumentException("Cannot find collection:" + collection);
       }
-
-      Map<String,Slice> slices = docCollection.getSlicesMap();
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
+    Map<String,Slice> slices = docCollection.getSlicesMap();
       for (Map.Entry<String,Slice> entry : slices.entrySet()) {
         Slice slice = entry.getValue();
         if (slice.getState() != Slice.State.ACTIVE) {
@@ -278,8 +283,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
         Map<String,Replica> shards = slice.getReplicasMap();
         for (Map.Entry<String,Replica> shard : shards.entrySet()) {
           Replica replica = shard.getValue();
-          if (replica.getState() != Replica.State.ACTIVE) {
-            fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + replica.getState());
+          if (ssp.getState(replica) != Replica.State.ACTIVE) {
+            fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + ssp.getState(replica));
           }
         }
       }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index ee8165e..0fa592b 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -2118,13 +2118,14 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
       assertNotNull(cs);
       final DocCollection docCollection = cs.getCollectionOrNull(testCollectionName);
       assertNotNull("No collection found for " + testCollectionName, docCollection);
+      ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
       Slice shard = docCollection.getSlice(shardId);
       assertNotNull("No Slice for "+shardId, shard);
       allReplicasUp = true; // assume true
       Collection<Replica> replicas = shard.getReplicas();
       assertTrue("Did not find correct number of replicas. Expected:" + rf + " Found:" + replicas.size(), replicas.size() == rf);
       
-      leader = shard.getLeader();
+      leader = ssp.getLeader(shard);
       assertNotNull(leader);
       log.info("Found "+replicas.size()+" replicas and leader on "+
         leader.getNodeName()+" for "+shardId+" in "+testCollectionName);
@@ -2132,8 +2133,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
       // ensure all replicas are "active" and identify the non-leader replica
       for (Replica replica : replicas) {
         if (!zkShardTerms.canBecomeLeader(replica.getName()) ||
-            replica.getState() != Replica.State.ACTIVE) {
-          log.info("Replica {} is currently {}", replica.getName(), replica.getState());
+            ssp.getState(replica) != Replica.State.ACTIVE) {
+          log.info("Replica {} is currently {}", replica.getName(), ssp.getState(replica));
           allReplicasUp = false;
         }
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
index a6939b2..bffcd46 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase.CloudJettyRunner;
 import org.apache.solr.common.cloud.DocCollection;
@@ -650,6 +651,7 @@ public class ChaosMonkey {
     if (docCollection == null) {
       monkeyLog("Could not find collection {}", collectionName);
     }
+    ShardStateProvider ssp = zkStateReader.getShardStateProvider(collectionName);
     StringBuilder builder = new StringBuilder();
     builder.append("Collection status: {");
     for (Slice slice:docCollection.getSlices()) {
@@ -660,7 +662,7 @@ public class ChaosMonkey {
         m.find();
         String jettyPort = m.group(1);
         builder.append(String.format(Locale.ROOT, "%s(%s): {state: %s, type: %s, leader: %s, Live: %s}, ", 
-            replica.getName(), jettyPort, replica.getState(), replica.getType(), (replica.get("leader")!= null), zkStateReader.getClusterState().liveNodesContain(replica.getNodeName())));
+            replica.getName(), jettyPort, ssp.getState(replica), replica.getType(), (replica.get("leader")!= null), zkStateReader.getClusterState().liveNodesContain(replica.getNodeName())));
       }
       if (slice.getReplicas().size() > 0) {
         builder.setLength(builder.length() - 2);
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index fed8c12..c4601df 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
       }
       
       for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
-        reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
+        reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState,ssp) -> collectionState == null ? true : false);
       }
      
     }