You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/01/07 15:47:19 UTC

[lucene-solr] branch branch_8x updated: Revert "SOLR-15052: Per-replica states for reducing overseer bottlenecks (branch_8x) (#2148)"

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new e1e5509  Revert "SOLR-15052: Per-replica states for reducing overseer bottlenecks (branch_8x) (#2148)"
e1e5509 is described below

commit e1e55090fb58854ba5bb0b2aa3fdd590a29a1fcc
Author: noblepaul <no...@gmail.com>
AuthorDate: Fri Jan 8 02:46:03 2021 +1100

    Revert "SOLR-15052: Per-replica states for reducing overseer bottlenecks (branch_8x) (#2148)"
    
    This reverts commit c8d6f8c0de8fba2bf1c801ed07e956549f17c1e7.
    
    accidental merge
---
 solr/CHANGES.txt                                   |   3 -
 .../solr/cloud/ShardLeaderElectionContextBase.java |  17 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  61 ++--
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../OverseerCollectionMessageHandler.java          |   1 -
 .../solr/cloud/overseer/CollectionMutator.java     |  31 +-
 .../apache/solr/cloud/overseer/NodeMutator.java    |  11 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  24 +-
 .../apache/solr/cloud/overseer/SliceMutator.java   |  48 +--
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  74 +----
 .../apache/solr/cloud/overseer/ZkWriteCommand.java |  20 --
 .../solr/handler/admin/CollectionsHandler.java     |   6 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  11 +-
 .../test/org/apache/solr/cloud/SplitShardTest.java |   3 -
 .../org/apache/solr/cloud/ZkSolrClientTest.java    |  31 --
 .../solr/cloud/api/collections/ShardSplitTest.java |  12 +-
 .../solr/handler/PingRequestHandlerTest.java       |   2 -
 .../apache/solr/handler/TestContainerPlugin.java   |   1 +
 .../org/apache/solr/handler/TestSQLHandler.java    |   4 +-
 .../solr/handler/TestStressThreadBackup.java       |   6 +-
 .../solr/handler/admin/HealthCheckHandlerTest.java |   1 -
 .../solr/handler/admin/IndexSizeEstimatorTest.java |   1 -
 .../handler/admin/MetricsHistoryHandlerTest.java   |   3 +-
 .../component/CustomHighlightComponentTest.java    |   3 +-
 .../DistributedQueryComponentOptimizationTest.java |   1 -
 .../solr/handler/component/SearchHandlerTest.java  |   4 -
 .../reporters/SolrJmxReporterCloudTest.java        |   1 -
 .../reporters/solr/SolrCloudReportersTest.java     |   2 -
 .../src/test/org/apache/solr/pkg/TestPackages.java |   1 -
 .../transform/TestSubQueryTransformerDistrib.java  |   2 -
 .../schema/ManagedSchemaRoundRobinCloudTest.java   |   1 -
 .../PreAnalyzedFieldManagedSchemaCloudTest.java    |   1 -
 .../apache/solr/schema/TestManagedSchemaAPI.java   |   2 -
 .../solr/search/CurrencyRangeFacetCloudTest.java   |   1 -
 .../solr/search/facet/RangeFacetCloudTest.java     |   1 -
 .../search/facet/TestCloudJSONFacetJoinDomain.java |   1 -
 .../solr/search/facet/TestCloudJSONFacetSKG.java   |   1 -
 .../search/facet/TestCloudJSONFacetSKGEquiv.java   |   1 -
 .../search/join/BlockJoinFacetDistribTest.java     |   1 -
 .../search/join/CrossCollectionJoinQueryTest.java  |   2 -
 .../apache/solr/search/stats/TestDistribIDF.java   |   3 +-
 .../solr/servlet/HttpSolrCallGetCoreTest.java      |   1 -
 .../update/TestInPlaceUpdateWithRouteField.java    |   1 -
 .../update/processor/AtomicUpdateJavabinTest.java  | 370 +++++++++++++++++++++
 .../DimensionalRoutedAliasUpdateProcessorTest.java |   5 +-
 .../processor/TemplateUpdateProcessorTest.java     |   2 +-
 .../test/org/apache/solr/util/TestExportTool.java  |   2 -
 .../solr/util/tracing/TestDistributedTracing.java  |   1 -
 .../client/solrj/cloud/DistribStateManager.java    |   7 -
 .../client/solrj/impl/SolrClientCloudManager.java  |   3 -
 .../client/solrj/impl/ZkDistribStateManager.java   |   6 -
 .../solrj/request/CollectionAdminRequest.java      |   8 -
 .../org/apache/solr/common/cloud/ClusterState.java |  73 ----
 .../apache/solr/common/cloud/DocCollection.java    |  73 +---
 .../apache/solr/common/cloud/PerReplicaStates.java | 310 -----------------
 .../solr/common/cloud/PerReplicaStatesOps.java     | 301 -----------------
 .../java/org/apache/solr/common/cloud/Replica.java |  84 +----
 .../java/org/apache/solr/common/cloud/Slice.java   |  26 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  12 -
 .../apache/solr/common/cloud/ZkStateReader.java    | 107 +-----
 .../IndexingNestedDocuments.java                   |   4 +-
 .../JsonRequestApiHeatmapFacetingTest.java         |   4 +-
 .../ref_guide_examples/JsonRequestApiTest.java     |   3 +-
 .../UsingSolrJRefGuideExamplesTest.java            |   1 -
 .../client/solrj/impl/CloudSolrClientTest.java     | 222 +++++--------
 .../solrj/io/stream/CloudAuthStreamTest.java       |   3 +-
 .../client/solrj/io/stream/JDBCStreamTest.java     |   4 +-
 .../client/solrj/io/stream/MathExpressionTest.java |   5 +-
 .../solrj/io/stream/SelectWithEvaluatorsTest.java  |   4 +-
 .../solrj/io/stream/StreamDecoratorTest.java       |  51 +--
 ...ectJsonQueryRequestFacetingIntegrationTest.java |   4 +-
 .../JsonQueryRequestFacetingIntegrationTest.java   |   4 +-
 .../json/JsonQueryRequestHeatmapFacetingTest.java  |   4 +-
 .../cloud/TestCloudCollectionsListeners.java       |   4 -
 .../common/cloud/TestCollectionStateWatchers.java  |  21 +-
 .../solr/common/cloud/TestPerReplicaStates.java    | 133 --------
 .../org/apache/solr/cloud/SolrCloudTestCase.java   |  10 -
 77 files changed, 599 insertions(+), 1676 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bc83b90..a581443 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -22,8 +22,6 @@ New Features
 
 * SOLR-14560: Add interleaving support in Learning To Rank. (Alessandro Benedetti, Christine Poerschke)
 
-* SOLR-15052: Reducing overseer bottlenecks using per-replica states (noble, Ishan Chattopadhyaya)
-
 Improvements
 ---------------------
 * SOLR-14942: Reduce leader election time on node shutdown by removing election nodes before closing cores.
@@ -47,7 +45,6 @@ Improvements
 
 * SOLR-15062: /api/cluster/zk/ls should give the stat of the current node (noble)
 
-=======
 * SOLR-15069: [child]: the parentFilter parameter is now fully optional and perhaps obsolete.
   (David Smiley)
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index dfe1ea6..b010de2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -25,9 +25,6 @@ import java.util.ArrayList;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -182,14 +179,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
           ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
       assert zkController != null;
       assert zkController.getOverseer() != null;
-      DocCollection coll = zkStateReader.getCollection(this.collection);
-      if (coll == null || coll.getStateFormat() < 2 || ZkController.sendToOverseer(coll, id)) {
-        zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
-      } else {
-        PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
-        PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
-            .persist(coll.getZNode(), zkStateReader.getZkClient());
-      }
+      zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
     }
   }
 
@@ -197,4 +187,9 @@ class ShardLeaderElectionContextBase extends ElectionContext {
     return leaderElector;
   }
 
+  Integer getLeaderZkNodeParentVersion() {
+    synchronized (lock) {
+      return leaderZkNodeParentVersion;
+    }
+  }
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 96e2de0..6cc2afe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -64,8 +64,31 @@ import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.cloud.BeforeReconnect;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ConnectionManager;
+import org.apache.solr.common.cloud.DefaultConnectionStrategy;
+import org.apache.solr.common.cloud.DefaultZkACLProvider;
+import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.cloud.NodesSysPropsCacher;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Replica.Type;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.UrlScheme;
+import org.apache.solr.common.cloud.ZkACLProvider;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkCredentialsProvider;
+import org.apache.solr.common.cloud.ZkMaintenanceUtils;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
@@ -351,7 +374,7 @@ public class ZkController implements Closeable {
               }
 
               cc.cancelCoreRecoveries();
-
+              
               try {
                 registerAllCoresAsDown(registerOnReconnect, false);
               } catch (SessionExpiredException e) {
@@ -361,7 +384,7 @@ public class ZkController implements Closeable {
                 // this is really best effort - in case of races or failure cases where we now need to be the leader, if anything fails,
                 // just continue
                 log.warn("Exception while trying to register all cores as DOWN", e);
-              }
+              } 
 
               // we have to register as live first to pick up docs in the buffer
               createEphemeralLiveNode();
@@ -1586,40 +1609,12 @@ public class ZkController implements Closeable {
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      DocCollection coll = zkStateReader.getCollection(collection);
-      if (forcePublish || sendToOverseer(coll, coreNodeName)) {
-        overseerJobQueue.offer(Utils.toJSON(m));
-      } else {
-        if (log.isDebugEnabled()) {
-          log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
-        }
-        PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
-        PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
-            .persist(coll.getZNode(), zkClient);
-      }
+      overseerJobQueue.offer(Utils.toJSON(m));
     } finally {
       MDCLoggingContext.clear();
     }
   }
 
-  /**
-   * Whether a message needs to be sent to overseer or not
-   */
-  static boolean sendToOverseer(DocCollection coll, String replicaName) {
-    if (coll == null) return true;
-    if (coll.getStateFormat() < 2 || !coll.isPerReplicaState()) return true;
-    Replica r = coll.getReplica(replicaName);
-    if (r == null) return true;
-    Slice shard = coll.getSlice(r.slice);
-    if (shard == null) return true;//very unlikely
-    if (shard.getState() == Slice.State.RECOVERY) return true;
-    if (shard.getParent() != null) return true;
-    for (Slice slice : coll.getSlices()) {
-      if (Objects.equals(shard.getName(), slice.getParent())) return true;
-    }
-    return false;
-  }
-
   public ZkShardTerms getShardTerms(String collection, String shardId) {
     return getCollectionTerms(collection).getShard(shardId);
   }
@@ -2203,7 +2198,7 @@ public class ZkController implements Closeable {
     try {
       if (electionNode != null) {
         // Check whether we came to this node by mistake
-        if ( overseerElector.getContext() != null && overseerElector.getContext().leaderSeqPath == null
+        if ( overseerElector.getContext() != null && overseerElector.getContext().leaderSeqPath == null 
             && !overseerElector.getContext().leaderSeqPath.endsWith(electionNode)) {
           log.warn("Asked to rejoin with wrong election node : {}, current node is {}", electionNode, overseerElector.getContext().leaderSeqPath);
           //however delete it . This is possible when the last attempt at deleting the election node failed.
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a9299cc..89f94a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -179,7 +179,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       while (! waitUntil.hasTimedOut()) {
         waitUntil.sleep(100);
         created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
-        if (created) break;
+        if(created) break;
       }
       if (!created) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 3c474e5..f4e349b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -150,7 +150,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       ZkStateReader.PULL_REPLICAS, "0",
       ZkStateReader.MAX_SHARDS_PER_NODE, "1",
       ZkStateReader.AUTO_ADD_REPLICAS, "false",
-      DocCollection.PER_REPLICA_STATE, null,
       DocCollection.RULE, null,
       POLICY, null,
       SNITCH, null,
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index a816270..80e0e9e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -28,11 +28,8 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.PerReplicaStates;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
@@ -49,12 +46,10 @@ public class CollectionMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
-  protected final SolrZkClient zkClient;
 
   public CollectionMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
-    this.zkClient = SliceMutator.getZkClient(cloudManager);
   }
 
   public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) {
@@ -112,24 +107,10 @@ public class CollectionMutator {
     DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP));
     Map<String, Object> m = coll.shallowCopy();
     boolean hasAnyOps = false;
-    PerReplicaStatesOps replicaOps = null;
     for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
-      if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
-        String val = message.getStr(DocCollection.PER_REPLICA_STATE);
-        if (val == null) continue;
-        boolean enable = Boolean.parseBoolean(val);
-        if (enable == coll.isPerReplicaState()) {
-          //already enabled
-          log.error("trying to set perReplicaState to {} from {}", val, coll.isPerReplicaState());
-          continue;
-        }
-        replicaOps = PerReplicaStatesOps.modifyCollection(coll, enable, PerReplicaStates.fetch(coll.getZNode(), zkClient, null));
-      }
-
-
       if (message.containsKey(prop)) {
         hasAnyOps = true;
-        if (message.get(prop) == null) {
+        if (message.get(prop) == null)  {
           m.remove(prop);
         } else  {
           m.put(prop, message.get(prop));
@@ -155,13 +136,8 @@ public class CollectionMutator {
       return ZkStateWriter.NO_OP;
     }
 
-    DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode());
-    if (replicaOps == null){
-      return new ZkWriteCommand(coll.getName(), collection);
-    } else {
-      return new ZkWriteCommand(coll.getName(), collection, replicaOps, true);
-    }
-
+    return new ZkWriteCommand(coll.getName(),
+        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode()));
   }
 
   public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
@@ -198,3 +174,4 @@ public class CollectionMutator {
     return true;
   }
 }
+
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index e8db2b4..56bcfd5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,7 +26,6 @@ import java.util.Map.Entry;
 
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -46,8 +45,6 @@ public class NodeMutator {
 
     Map<String, DocCollection> collections = clusterState.getCollectionsMap();
     for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-       List<String> downedReplicas = new ArrayList<>();
-
       String collection = entry.getKey();
       DocCollection docCollection = entry.getValue();
 
@@ -71,7 +68,6 @@ public class NodeMutator {
             Replica newReplica = new Replica(replica.getName(), props, collection, slice.getName());
             newReplicas.put(replica.getName(), newReplica);
             needToUpdateCollection = true;
-            downedReplicas.add(replica.getName());
           }
         }
 
@@ -80,12 +76,7 @@ public class NodeMutator {
       }
 
       if (needToUpdateCollection) {
-        if (docCollection.isPerReplicaState()) {
-          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
-              PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
-        } else {
-          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
-        }
+        zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 93d042b..7891cc1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -39,11 +39,8 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
@@ -53,7 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
-import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 public class ReplicaMutator {
@@ -61,12 +57,10 @@ public class ReplicaMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
-  protected SolrZkClient zkClient;
 
   public ReplicaMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
-    this.zkClient = getZkClient(cloudManager);
   }
 
   protected Replica setProperty(Replica replica, String key, String value) {
@@ -248,12 +242,10 @@ public class ReplicaMutator {
     boolean forceSetState = message.getBool(ZkStateReader.FORCE_SET_STATE_PROP, true);
 
     DocCollection collection = prevState.getCollectionOrNull(collectionName);
-
     if (!forceSetState && !CloudUtil.replicaExists(prevState, collectionName, sliceName, coreNodeName)) {
       log.info("Failed to update state because the replica does not exist, {}", message);
       return ZkStateWriter.NO_OP;
     }
-    boolean persistCollectionState = collection != null && collection.isPerReplicaState();
 
     if (coreNodeName == null) {
       coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
@@ -265,7 +257,6 @@ public class ReplicaMutator {
           log.info("Failed to update state because the replica does not exist, {}", message);
           return ZkStateWriter.NO_OP;
         }
-        persistCollectionState = true;
         // if coreNodeName is null, auto assign one
         coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
       }
@@ -280,7 +271,6 @@ public class ReplicaMutator {
       if (sliceName != null) {
         log.debug("shard={} is already registered", sliceName);
       }
-      persistCollectionState = true;
     }
     if (sliceName == null) {
       //request new shardId
@@ -291,15 +281,13 @@ public class ReplicaMutator {
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
-      persistCollectionState = true;
     }
 
-    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
+    Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
-    Replica oldReplica = null;
     if (slice != null) {
-      oldReplica = slice.getReplica(coreNodeName);
+      Replica oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -364,12 +352,7 @@ public class ReplicaMutator {
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
     log.debug("Collection is now: {}", newCollection);
-    if (collection != null && collection.isPerReplicaState()) {
-      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
-      return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
-    } else{
-      return new ZkWriteCommand(collectionName, newCollection);
-    }
+    return new ZkWriteCommand(collectionName, newCollection);
   }
 
   /**
@@ -522,3 +505,4 @@ public class ReplicaMutator {
     return collection;
   }
 }
+
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index d0f08cc..40ab1a3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -25,18 +25,14 @@ import java.util.Set;
 import com.google.common.collect.ImmutableSet;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -55,21 +51,10 @@ public class SliceMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
-  protected final SolrZkClient zkClient;
 
   public SliceMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
-    this.zkClient = getZkClient(cloudManager);
-  }
-
-  static SolrZkClient getZkClient(SolrCloudManager cloudManager) {
-    if (cloudManager instanceof SolrClientCloudManager) {
-      SolrClientCloudManager manager = (SolrClientCloudManager) cloudManager;
-      return manager.getZkClient();
-    } else {
-      return null;
-    }
   }
 
   public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -95,15 +80,7 @@ public class SliceMutator {
             ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
             ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
             ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
-
-    if (collection.isPerReplicaState()) {
-      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
-      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
-          PerReplicaStatesOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
-    } else {
-      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
-    }
-
+    return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
   }
 
   public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -129,12 +106,7 @@ public class SliceMutator {
       newSlices.put(slice.getName(), slice);
     }
 
-
-    if (coll.isPerReplicaState()) {
-      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices), PerReplicaStatesOps.deleteReplica(cnn, coll.getPerReplicaStates()) , true);
-    } else {
-      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
-    }
+    return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
   }
 
   public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
@@ -152,7 +124,6 @@ public class SliceMutator {
     Slice slice = slices.get(sliceName);
 
     Replica oldLeader = slice.getLeader();
-    Replica newLeader = null;
     final Map<String, Replica> newReplicas = new LinkedHashMap<>();
     for (Replica replica : slice.getReplicas()) {
       // TODO: this should only be calculated once and cached somewhere?
@@ -161,7 +132,7 @@ public class SliceMutator {
       if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
         replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
       } else if (coreURL.equals(leaderUrl)) {
-        newLeader = replica = new ReplicaMutator(cloudManager).setLeader(replica);
+        replica = new ReplicaMutator(cloudManager).setLeader(replica);
       }
 
       newReplicas.put(replica.getName(), replica);
@@ -170,17 +141,8 @@ public class SliceMutator {
     Map<String, Object> newSliceProps = slice.shallowCopy();
     newSliceProps.put(Slice.REPLICAS, newReplicas);
     slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
-    if (coll.isPerReplicaState()) {
-      PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
-      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
-          PerReplicaStatesOps.flipLeader(
-              slice.getReplicaNames(),
-              newLeader == null ? null : newLeader.getName(),
-              prs), false);
-    } else {
-      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
-    }
-  }    
+    return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+  }
 
   public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
     String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index e69dcbf..cb89371 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -27,7 +27,6 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -65,7 +64,7 @@ public class ZkStateWriter {
   protected final ZkStateReader reader;
   protected final Stats stats;
 
-  protected Map<String, ZkWriteCommand> updates = new HashMap<>();
+  protected Map<String, DocCollection> updates = new HashMap<>();
   private int numUpdates = 0;
   protected ClusterState clusterState = null;
   protected boolean isClusterStateModified = false;
@@ -114,38 +113,6 @@ public class ZkStateWriter {
     if (cmds.isEmpty()) return prevState;
     if (isNoOps(cmds)) return prevState;
 
-    boolean forceFlush = false;
-    if (cmds.size() == 1) {
-      //most messages result in only one command. let's deal with it right away
-      ZkWriteCommand cmd = cmds.get(0);
-      if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
-        //we do not wish to batch any updates for collections with per-replica state because
-        // these changes go to individual ZK nodes and there is zero advantage to batching
-        //now check if there are any updates for the same collection already present
-        if (updates.containsKey(cmd.name)) {
-          //this should not happen
-          // but let's get those updates out anyway
-          writeUpdate(updates.remove(cmd.name));
-        }
-        //now let's write the current message
-        try {
-          return writeUpdate(cmd);
-        } finally {
-          if (callback !=null) callback.onWrite();
-        }
-      }
-    } else {
-      //there are more than one commands created as a result of this message
-      for (ZkWriteCommand cmd : cmds) {
-        if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
-          // we don't try to optimize for this case. let's flush out all after this
-          forceFlush = true;
-          break;
-        }
-      }
-    }
-
-
     for (ZkWriteCommand cmd : cmds) {
       if (cmd == NO_OP) continue;
       if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
@@ -153,13 +120,13 @@ public class ZkStateWriter {
       }
       prevState = prevState.copyWith(cmd.name, cmd.collection);
       if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
-        updates.put(cmd.name, cmd);
+        updates.put(cmd.name, cmd.collection);
         numUpdates++;
       }
     }
     clusterState = prevState;
 
-    if (forceFlush || maybeFlushAfter()) {
+    if (maybeFlushAfter()) {
       ClusterState state = writePendingUpdates();
       if (callback != null) {
         callback.onWrite();
@@ -198,15 +165,7 @@ public class ZkStateWriter {
   public boolean hasPendingUpdates() {
     return numUpdates != 0 || isClusterStateModified;
   }
-  public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
-    Map<String, ZkWriteCommand> commands = new HashMap<>();
-    commands.put(command.name, command);
-    return writePendingUpdates(commands);
-  }
-  public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
-    return writePendingUpdates(updates);
 
-  }
   /**
    * Writes all pending updates to ZooKeeper and returns the modified cluster state
    *
@@ -215,30 +174,20 @@ public class ZkStateWriter {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if ((updates == this.updates)
-        && !hasPendingUpdates()) {
-      return clusterState;
-    }
+    if (!hasPendingUpdates()) return clusterState;
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
+        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          ZkWriteCommand cmd = entry.getValue();
-          DocCollection c = cmd.collection;
+          DocCollection c = entry.getValue();
 
-          if (cmd.ops != null && cmd.ops.isPreOp()) {
-            cmd.ops.persist(path, reader.getZkClient());
-            clusterState = clusterState.copyWith(name,
-                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
-          }
-          if (!cmd.persistCollState) continue;
           if (c == null) {
             // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
             log.debug("going to delete state.json {}", path);
@@ -261,14 +210,6 @@ public class ZkStateWriter {
           } else if (c.getStateFormat() == 1) {
             isClusterStateModified = true;
           }
-          if (cmd.ops != null && !cmd.ops.isPreOp()) {
-            cmd.ops.persist(path, reader.getZkClient());
-            DocCollection currentCollState = clusterState.getCollection(cmd.name);
-            if ( currentCollState != null) {
-              clusterState = clusterState.copyWith(name,
-                  currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
-            }
-          }
         }
 
         updates.clear();
@@ -317,3 +258,4 @@ public class ZkStateWriter {
     void onWrite() throws Exception;
   }
 }
+
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index 39d953c..d464863 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,34 +17,16 @@
 package org.apache.solr.cloud.overseer;
 
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStatesOps;
 
 public class ZkWriteCommand {
-
   public final String name;
   public final DocCollection collection;
-
   public final boolean noop;
-  // persist the collection state. If this is false, it means the collection state is not modified
-  public final boolean persistCollState;
-  public final PerReplicaStatesOps ops;
 
-  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
-    boolean isPerReplicaState = collection.isPerReplicaState();
-    this.name = name;
-    this.collection = collection;
-    this.noop = false;
-    this.ops = isPerReplicaState ? replicaOps : null;
-    this.persistCollState = isPerReplicaState ? persistCollState : true;
-  }
   public ZkWriteCommand(String name, DocCollection collection) {
     this.name = name;
     this.collection = collection;
     this.noop = false;
-    persistCollState = true;
-    this.ops = collection != null && collection.isPerReplicaState() ?
-        PerReplicaStatesOps.touchChildren():
-        null;
   }
 
   /**
@@ -54,8 +36,6 @@ public class ZkWriteCommand {
     this.noop = true;
     this.name = null;
     this.collection = null;
-    this.ops = null;
-    persistCollState = true;
   }
 
   public static ZkWriteCommand noop() {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 858eef8..0832126 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -123,7 +123,6 @@ import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHan
 import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-import static org.apache.solr.common.cloud.DocCollection.PER_REPLICA_STATE;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
@@ -496,7 +495,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           POLICY,
           WAIT_FOR_FINAL_STATE,
           WITH_COLLECTION,
-          PER_REPLICA_STATE,
           ALIAS);
 
       props.putIfAbsent(STATE_FORMAT, "2");
@@ -1455,9 +1453,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
               }
               if (!n.contains(replica.getNodeName())
                   || !state.equals(Replica.State.ACTIVE.toString())) {
-                if (log.isDebugEnabled()) {
-                  log.debug("inactive replica {} , state {}", replica.getName(), replica.getReplicaState());
-                }
                 replicaNotAliveCnt++;
                 return false;
               }
@@ -1469,6 +1464,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         return false;
       });
     } catch (TimeoutException | InterruptedException e) {
+
       String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
       throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index f2b6e59..e1f502e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -112,7 +112,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateWithDefaultConfigSet() throws Exception {
     String collectionName = "solrj_default_configset";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -245,7 +244,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
       assertEquals("2", String.valueOf(clusterProperty));
       CollectionAdminResponse response = CollectionAdminRequest
           .createCollection(COLL_NAME, "conf", null, null, null, null)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(cluster.getSolrClient());
       assertEquals(0, response.getStatus());
       assertTrue(response.isSuccess());
@@ -421,9 +419,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateAndDeleteAlias() throws IOException, SolrServerException {
 
     final String collection = "aliasedCollection";
-    CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 1, 1).process(cluster.getSolrClient());
 
     CollectionAdminResponse response
         = CollectionAdminRequest.createAlias("solrj_alias", collection).process(cluster.getSolrClient());
@@ -438,7 +434,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
     final String collectionName = "solrj_test_splitshard";
     CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     cluster.waitForActiveCollection(collectionName, 2, 2);
@@ -493,7 +488,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     cluster.getJettySolrRunners().forEach(j -> j.getCoreContainer().getAllowPaths().add(tmpDir));
 
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty(CoreAdminParams.DATA_DIR, dataDir.toString())
         .withProperty(CoreAdminParams.ULOG_DIR, ulogDir.toString())
         .process(cluster.getSolrClient());
@@ -520,7 +514,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
     final String collectionName = "solrj_replicatests";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 1, 2);
@@ -591,7 +584,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     final String propName = "testProperty";
 
     CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -623,7 +615,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testColStatus() throws Exception {
     final String collectionName = "collectionStatusTest";
     CollectionAdminRequest.createCollection(collectionName, "conf2", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     cluster.waitForActiveCollection(collectionName, 2, 4);
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index 365c396..0e36b57 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -77,7 +77,6 @@ public class SplitShardTest extends SolrCloudTestCase {
   public void doTest() throws IOException, SolrServerException {
     CollectionAdminRequest
         .createCollection(COLLECTION_NAME, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(100)
         .process(cluster.getSolrClient());
     
@@ -129,7 +128,6 @@ public class SplitShardTest extends SolrCloudTestCase {
     String collectionName = "splitFuzzCollection";
     CollectionAdminRequest
         .createCollection(collectionName, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(100)
         .process(cluster.getSolrClient());
 
@@ -159,7 +157,6 @@ public class SplitShardTest extends SolrCloudTestCase {
 
       CollectionAdminRequest
           .createCollection(collectionName, "conf", 1, repFactor)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .setMaxShardsPerNode(100)
           .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
index 3f86835..e9afc3b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
@@ -18,9 +18,7 @@ package org.apache.solr.cloud;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -31,16 +29,12 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.apache.solr.cloud.SolrCloudTestCase.configureCluster;
-
 public class ZkSolrClientTest extends SolrTestCaseJ4 {
 
   @BeforeClass
@@ -382,31 +376,6 @@ public class ZkSolrClientTest extends SolrTestCaseJ4 {
     }
   }
 
-  public void testZkBehavior() throws Exception {
-    MiniSolrCloudCluster cluster =
-        configureCluster(4)
-            .withJettyConfig(jetty -> jetty.enableV2(true))
-            .configure();
-    try {
-      SolrZkClient zkClient = cluster.getZkClient();
-      zkClient.create("/test-node", null, CreateMode.PERSISTENT, true);
-
-      Stat stat = zkClient.exists("/test-node", null, true);
-      int cversion = stat.getCversion();
-      List<Op> ops = Arrays.asList(
-          Op.create("/test-node/abc", null, zkClient.getZkACLProvider().getACLsToAdd("/test-node/abc"), CreateMode.PERSISTENT),
-          Op.delete("/test-node/abc", -1));
-      zkClient.multi(ops, true);
-      stat = zkClient.exists("/test-node", null, true);
-      assertTrue(stat.getCversion() >= cversion + 2);
-    } finally {
-      cluster.shutdown();
-    }
-
-  }
-
-
-
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 60c704c..e3b2634 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -77,7 +77,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
 @Slow
-@LogLevel("org.apache.solr.common.cloud.PerReplicaStates=DEBUG;org.apache.solr.common.cloud=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.cloud.api.collections=DEBUG;org.apache.solr.cloud.OverseerTaskProcessor=DEBUG;org.apache.solr.util.TestInjection=DEBUG")
+@LogLevel("org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.cloud.api.collections=DEBUG;org.apache.solr.cloud.OverseerTaskProcessor=DEBUG;org.apache.solr.util.TestInjection=DEBUG")
 public class ShardSplitTest extends BasicDistributedZkTest {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -140,7 +140,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
     String collectionName = "testSplitStaticIndexReplication_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
-    create.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
     create.process(cloudClient);
@@ -360,7 +359,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     waitForThingsToLevelOut(15);
     String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 0, 2); // TODO tlog replicas disabled right now.
-    create.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.process(cloudClient);
     
@@ -654,7 +652,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     log.info("Starting testSplitShardWithRule");
     String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
     CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setRule("shard:*,replica:<2,node:*");
 
     CollectionAdminResponse response = createRequest.process(cloudClient);
@@ -823,9 +820,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
           MAX_SHARDS_PER_NODE, maxShardsPerNode,
           OverseerCollectionMessageHandler.NUM_SLICES, numShards,
           "router.field", shard_fld);
-      if (SolrCloudTestCase.USE_PER_REPLICA_STATE) {
-        props.put(DocCollection.PER_REPLICA_STATE, Boolean.TRUE);
-      }
 
       createCollection(collectionInfos, collectionName, props, client);
     }
@@ -887,9 +881,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
           REPLICATION_FACTOR, replicationFactor,
           MAX_SHARDS_PER_NODE, maxShardsPerNode,
           OverseerCollectionMessageHandler.NUM_SLICES, numShards);
-     if (SolrCloudTestCase.USE_PER_REPLICA_STATE) {
-       props.put(DocCollection.PER_REPLICA_STATE, Boolean.TRUE);
-     }
+
       createCollection(collectionInfos, collectionName,props,client);
     }
 
diff --git a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
index 9d196cf..dd8aead 100644
--- a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.SolrPing;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
@@ -189,7 +188,6 @@ public class PingRequestHandlerTest extends SolrTestCaseJ4 {
       String configName = "solrCloudCollectionConfig";
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1").resolve("conf"), configName);
       CollectionAdminRequest.createCollection(collectionName, configName, NUM_SHARDS, REPLICATION_FACTOR)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       // Send distributed and non-distributed ping query
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 7e5bc9b..5ce53d6 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -20,6 +20,7 @@ package org.apache.solr.handler;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+//import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 5bc68532..96555ca 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -63,9 +63,7 @@ public class TestSQLHandler extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(collection, 2, 2);
     if (useAlias) {
       CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
index 626b889..3622948 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
@@ -35,6 +35,7 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.lucene.util.TestUtil;
 
 import org.apache.solr.client.solrj.SolrClient;
@@ -60,7 +61,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-//@Nightly
+@Nightly
 @SuppressCodecs({"SimpleText"})
 @LogLevel("org.apache.solr.handler.SnapShooter=DEBUG;org.apache.solr.core.IndexDeletionPolicyWrapper=DEBUG")
 public class TestStressThreadBackup extends SolrCloudTestCase {
@@ -95,8 +96,7 @@ public class TestStressThreadBackup extends SolrCloudTestCase {
         .configure();
 
     assertEquals(0, (CollectionAdminRequest.createCollection(DEFAULT_TEST_COLLECTION_NAME, "conf1", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient()).getStatus()));
+                     .process(cluster.getSolrClient()).getStatus()));
     adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
     initCoreNameAndSolrCoreClient();
   }
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
index 6564fc3..f8807ca 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
@@ -79,7 +79,6 @@ public class HealthCheckHandlerTest extends SolrCloudTestCase {
     try (HttpSolrClient httpSolrClient = getHttpSolrClient(cluster.getJettySolrRunner(0).getBaseUrl().toString())) {
       CollectionAdminResponse collectionAdminResponse = CollectionAdminRequest.createCollection("test", "_default", 1, 1)
           .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(httpSolrClient);
       assertEquals(0, collectionAdminResponse.getStatus());
       SolrResponse response = req.process(httpSolrClient);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/IndexSizeEstimatorTest.java b/solr/core/src/test/org/apache/solr/handler/admin/IndexSizeEstimatorTest.java
index 5dfb925..b092be1 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/IndexSizeEstimatorTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/IndexSizeEstimatorTest.java
@@ -75,7 +75,6 @@ public class IndexSizeEstimatorTest extends SolrCloudTestCase {
         .configure();
     solrClient = cluster.getSolrClient();
     CollectionAdminRequest.createCollection(collection, "conf", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(2).process(solrClient);
     cluster.waitForActiveCollection(collection, 2, 4);
     SolrInputDocument lastDoc = addDocs(collection, NUM_DOCS);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
index 3c184e5..27987bd 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
@@ -95,8 +95,7 @@ public class MetricsHistoryHandlerTest extends SolrCloudTestCase {
 
     // create .system collection
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL,
-        "conf", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
+        "conf", 1, 1);
     create.process(solrClient);
     CloudUtil.waitForState(cloudManager, "failed to create " + CollectionAdminParams.SYSTEM_COLL,
         CollectionAdminParams.SYSTEM_COLL, CloudUtil.clusterShape(1, 1));
diff --git a/solr/core/src/test/org/apache/solr/handler/component/CustomHighlightComponentTest.java b/solr/core/src/test/org/apache/solr/handler/component/CustomHighlightComponentTest.java
index d026ff4..84dd45a 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/CustomHighlightComponentTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/CustomHighlightComponentTest.java
@@ -128,8 +128,7 @@ public class CustomHighlightComponentTest extends SolrCloudTestCase {
     // create an empty collection
     CollectionAdminRequest
     .createCollection(COLLECTION, "conf", numShards, numReplicas)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .setMaxShardsPerNode(maxShardsPerNode)
+    .setMaxShardsPerNode(maxShardsPerNode)
     .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), false, true, DEFAULT_TIMEOUT);
   }
diff --git a/solr/core/src/test/org/apache/solr/handler/component/DistributedQueryComponentOptimizationTest.java b/solr/core/src/test/org/apache/solr/handler/component/DistributedQueryComponentOptimizationTest.java
index 865f9be..8572ae4 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/DistributedQueryComponentOptimizationTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/DistributedQueryComponentOptimizationTest.java
@@ -62,7 +62,6 @@ public class DistributedQueryComponentOptimizationTest extends SolrCloudTestCase
         .configure();
 
     CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(1)
         .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index aa17b55..f0b2973 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -30,7 +30,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -141,7 +140,6 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
     
       QueryRequest req = new QueryRequest();
@@ -184,7 +182,6 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       ModifiableSolrParams params = new ModifiableSolrParams();
@@ -232,7 +229,6 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 1)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
index 43949ad..42e7159 100644
--- a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
@@ -60,7 +60,6 @@ public class SolrJmxReporterCloudTest extends SolrCloudTestCase {
         .configure();
     CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
         .setMaxShardsPerNode(2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
   }
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/metrics/reporters/solr/SolrCloudReportersTest.java b/solr/core/src/test/org/apache/solr/metrics/reporters/solr/SolrCloudReportersTest.java
index 181c38c..cbd6092 100644
--- a/solr/core/src/test/org/apache/solr/metrics/reporters/solr/SolrCloudReportersTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/reporters/solr/SolrCloudReportersTest.java
@@ -68,7 +68,6 @@ public class SolrCloudReportersTest extends SolrCloudTestCase {
     cluster.uploadConfigSet(Paths.get(TEST_PATH().toString(), "configsets", "minimal", "conf"), "test");
 
     CollectionAdminRequest.createCollection("test_collection", "test", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(4)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("test_collection", 2, 4);
@@ -171,7 +170,6 @@ public class SolrCloudReportersTest extends SolrCloudTestCase {
     cluster.uploadConfigSet(Paths.get(TEST_PATH().toString(), "configsets", "minimal", "conf"), "test");
 
     CollectionAdminRequest.createCollection("test_collection", "test", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(4)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("test_collection", 2, 4);
diff --git a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
index 9b30753..87134a3 100644
--- a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
+++ b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
@@ -144,7 +144,6 @@ public class TestPackages extends SolrCloudTestCase {
 
       CollectionAdminRequest
           .createCollection(COLLECTION_NAME, "conf", 2, 2)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .setMaxShardsPerNode(100)
           .process(cluster.getSolrClient());
       cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
diff --git a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
index 524dff9..0f9221c 100644
--- a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
+++ b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
@@ -78,12 +78,10 @@ public class TestSubQueryTransformerDistrib extends SolrCloudTestCase {
     int replicas = 2 ;
     CollectionAdminRequest.createCollection(people, configName, shards, replicas)
         .withProperty("config", "solrconfig-doctransformers.xml")
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty("schema", "schema-docValuesJoin.xml")
         .process(cluster.getSolrClient());
 
     CollectionAdminRequest.createCollection(depts, configName, shards, replicas)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty("config", "solrconfig-doctransformers.xml")
         .withProperty("schema", 
               differentUniqueId ? "schema-minimal-with-another-uniqkey.xml":
diff --git a/solr/core/src/test/org/apache/solr/schema/ManagedSchemaRoundRobinCloudTest.java b/solr/core/src/test/org/apache/solr/schema/ManagedSchemaRoundRobinCloudTest.java
index 44995bd..883ebfd 100644
--- a/solr/core/src/test/org/apache/solr/schema/ManagedSchemaRoundRobinCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/schema/ManagedSchemaRoundRobinCloudTest.java
@@ -46,7 +46,6 @@ public class ManagedSchemaRoundRobinCloudTest extends SolrCloudTestCase {
     System.setProperty("managed.schema.mutable", "true");
     configureCluster(NUM_SHARDS).addConfig(CONFIG, configset(CONFIG)).configure();
     CollectionAdminRequest.createCollection(COLLECTION, CONFIG, NUM_SHARDS, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(1)
         .process(cluster.getSolrClient());
     cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
diff --git a/solr/core/src/test/org/apache/solr/schema/PreAnalyzedFieldManagedSchemaCloudTest.java b/solr/core/src/test/org/apache/solr/schema/PreAnalyzedFieldManagedSchemaCloudTest.java
index d56939d..04e1be0 100644
--- a/solr/core/src/test/org/apache/solr/schema/PreAnalyzedFieldManagedSchemaCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/schema/PreAnalyzedFieldManagedSchemaCloudTest.java
@@ -40,7 +40,6 @@ public class PreAnalyzedFieldManagedSchemaCloudTest extends SolrCloudTestCase {
   public static void setupCluster() throws Exception {
     configureCluster(2).addConfig(CONFIG, configset(CONFIG)).configure();
     CollectionAdminRequest.createCollection(COLLECTION, CONFIG, 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(1)
         .process(cluster.getSolrClient());
     cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
diff --git a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
index f022ac3..a2375ba 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
@@ -51,8 +51,6 @@ public class TestManagedSchemaAPI extends SolrCloudTestCase {
   public void test() throws Exception {
     String collection = "testschemaapi";
     CollectionAdminRequest.createCollection(collection, "conf1", 1, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-
         .process(cluster.getSolrClient());
     testModifyField(collection);
     testReloadAndAddSimple(collection);
diff --git a/solr/core/src/test/org/apache/solr/search/CurrencyRangeFacetCloudTest.java b/solr/core/src/test/org/apache/solr/search/CurrencyRangeFacetCloudTest.java
index 86b3175..61ad8c2 100644
--- a/solr/core/src/test/org/apache/solr/search/CurrencyRangeFacetCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/search/CurrencyRangeFacetCloudTest.java
@@ -74,7 +74,6 @@ public class CurrencyRangeFacetCloudTest extends SolrCloudTestCase {
       .configure();
 
     assertEquals(0, (CollectionAdminRequest.createCollection(COLLECTION, CONF, numShards, numReplicas)
-                      .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
                      .setMaxShardsPerNode(maxShardsPerNode)
                      .setProperties(Collections.singletonMap(CoreAdminParams.CONFIG, "solrconfig-minimal.xml"))
                      .process(cluster.getSolrClient())).getStatus());
diff --git a/solr/core/src/test/org/apache/solr/search/facet/RangeFacetCloudTest.java b/solr/core/src/test/org/apache/solr/search/facet/RangeFacetCloudTest.java
index 7bdf34f..8e4cd11 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/RangeFacetCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/RangeFacetCloudTest.java
@@ -93,7 +93,6 @@ public class RangeFacetCloudTest extends SolrCloudTestCase {
       .configure();
 
     assertEquals(0, (CollectionAdminRequest.createCollection(COLLECTION, CONF, numShards, numReplicas)
-                     .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
                      .setMaxShardsPerNode(maxShardsPerNode)
                      .setProperties(Collections.singletonMap(CoreAdminParams.CONFIG, "solrconfig-minimal.xml"))
                      .process(cluster.getSolrClient())).getStatus());
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
index c1dc549..a88ed95 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
@@ -109,7 +109,6 @@ public class TestCloudJSONFacetJoinDomain extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
index 2c0754a..0992af8 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
@@ -139,7 +139,6 @@ public class TestCloudJSONFacetSKG extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
index aae946e..eb68662 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
@@ -133,7 +133,6 @@ public class TestCloudJSONFacetSKGEquiv extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/join/BlockJoinFacetDistribTest.java b/solr/core/src/test/org/apache/solr/search/join/BlockJoinFacetDistribTest.java
index 2c04d25..c4f0896 100644
--- a/solr/core/src/test/org/apache/solr/search/join/BlockJoinFacetDistribTest.java
+++ b/solr/core/src/test/org/apache/solr/search/join/BlockJoinFacetDistribTest.java
@@ -66,7 +66,6 @@ public class BlockJoinFacetDistribTest extends SolrCloudTestCase{
     int shards = 3;
     int replicas = 2 ;
     CollectionAdminRequest.createCollection(collection, configName, shards, replicas)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
     
diff --git a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
index a42ebaa..ebdb960 100644
--- a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
+++ b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
@@ -54,11 +54,9 @@ public class CrossCollectionJoinQueryTest extends SolrCloudTestCase {
 
 
     CollectionAdminRequest.createCollection("products", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     CollectionAdminRequest.createCollection("parts", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
   }
diff --git a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
index c7f4564..205b30b 100644
--- a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
+++ b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
@@ -29,7 +29,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
@@ -202,7 +201,7 @@ public class TestDistribIDF extends SolrTestCaseJ4 {
       response = create.process(solrCluster.getSolrClient());
       solrCluster.waitForActiveCollection(name, 3, 3);
     } else {
-      CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
+      CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1);
       create.setMaxShardsPerNode(1);
       response = create.process(solrCluster.getSolrClient());
       solrCluster.waitForActiveCollection(name, 2, 2);
diff --git a/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java b/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
index 3889f32..4f94388 100644
--- a/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
+++ b/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
@@ -48,7 +48,6 @@ public class HttpSolrCallGetCoreTest extends SolrCloudTestCase {
 
     CollectionAdminRequest
         .createCollection(COLLECTION, "config", NUM_SHARD, REPLICA_FACTOR)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(NUM_SHARD * REPLICA_FACTOR)
         .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
index 1e0b6f8..fc85fad 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
@@ -73,7 +73,6 @@ public class TestInPlaceUpdateWithRouteField extends SolrCloudTestCase {
     boolean implicit = random().nextBoolean();
     String routerName = implicit ? "implicit":"compositeId";
     Create createCmd = CollectionAdminRequest.createCollection(COLLECTION, configName, shards.length, replicas)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(shards.length * replicas)
         .setProperties(collectionProperties)
         .setRouterName(routerName)
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateJavabinTest.java b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateJavabinTest.java
new file mode 100644
index 0000000..758de5f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateJavabinTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests Solr's atomic-update functionality using requests sent through SolrJ using wt=javabin
+ *
+ * {@link AtomicUpdatesTest} covers some of the same functionality, but does so by making xml-based requests.  Recent
+ * changes to Solr have made it possible for the same data sent with different formats to result in different NamedLists
+ * after unmarshalling, so the test duplication is now necessary.  See SOLR-13331 for an example.
+ */
+public class AtomicUpdateJavabinTest extends SolrCloudTestCase {
+  private static final String COMMITTED_DOC_ID = "1";
+  private static final String COMMITTED_DOC_STR_VALUES_ID = "1s";
+  private static final String UNCOMMITTED_DOC_ID = "2";
+  private static final String UNCOMMITTED_DOC_STR_VALUES_ID = "2s";
+  private static final String COLLECTION = "collection1";
+  private static final int NUM_SHARDS = 1;
+  private static final int NUM_REPLICAS = 1;
+  private static final Date DATE_1 = Date.from(Instant.ofEpochSecond(1554243309));
+  private static final String DATE_1_STR = "2019-04-02T22:15:09Z";
+  private static final Date DATE_2 = Date.from(Instant.ofEpochSecond(1554243609));
+  private static final String DATE_2_STR = "2019-04-02T22:20:09Z";
+  private static final Date DATE_3 = Date.from(Instant.ofEpochSecond(1554243909));
+  private static final String DATE_3_STR = "2019-04-02T22:25:09Z";
+
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .addConfig("conf", configset("cloud-dynamic"))
+        .configure();
+
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", NUM_SHARDS, NUM_REPLICAS)
+        .process(cluster.getSolrClient());
+
+    cluster.waitForActiveCollection(COLLECTION, 1, 1);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+    final SolrInputDocument committedDoc = sdoc(
+            "id", COMMITTED_DOC_ID,
+            "title_s", "title_1", "title_s", "title_2",
+            "tv_mv_text", "text_1", "tv_mv_text", "text_2",
+            "count_is", 1, "count_is", 2,
+            "count_md", 1.0, "count_md", 2.0,
+            "timestamps_mdt", DATE_1, "timestamps_mdt", DATE_2);
+    final SolrInputDocument committedStrDoc = sdoc(
+            "id", COMMITTED_DOC_STR_VALUES_ID,
+            "title_s", "title_1", "title_s", "title_2",
+            "tv_mv_text", "text_1", "tv_mv_text", "text_2",
+            "count_is", "1", "count_is", "2",
+            "count_md", "1.0", "count_md", "2.0",
+            "timestamps_mdt", DATE_1_STR, "timestamps_mdt", DATE_2_STR);
+    final UpdateRequest committedRequest = new UpdateRequest()
+            .add(committedDoc)
+            .add(committedStrDoc);
+    committedRequest.commit(cluster.getSolrClient(), COLLECTION);
+
+    // Upload a copy of id:1 that's uncommitted to test how atomic-updates modify values in the tlog
+    // See SOLR-14971 for an example of why this case needs tested separately
+    final SolrInputDocument uncommittedDoc = sdoc(
+            "id", UNCOMMITTED_DOC_ID,
+            "title_s", "title_1", "title_s", "title_2",
+            "tv_mv_text", "text_1", "tv_mv_text", "text_2",
+            "count_is", 1, "count_is", 2,
+            "count_md", 1.0, "count_md", 2.0,
+            "timestamps_mdt", DATE_1, "timestamps_mdt", DATE_2);
+    final SolrInputDocument uncommittedStrDoc = sdoc(
+            "id", UNCOMMITTED_DOC_STR_VALUES_ID,
+            "title_s", "title_1", "title_s", "title_2",
+            "tv_mv_text", "text_1", "tv_mv_text", "text_2",
+            "count_is", "1", "count_is", "2",
+            "count_md", "1.0", "count_md", "2.0",
+            "timestamps_mdt", DATE_1_STR, "timestamps_mdt", DATE_2_STR);
+    final UpdateRequest uncommittedRequest = new UpdateRequest()
+            .add(uncommittedDoc)
+            .add(uncommittedStrDoc);
+    uncommittedRequest.process(cluster.getSolrClient(), COLLECTION);
+  }
+
+  @Test
+  public void testAtomicUpdateRemovalOfStrField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicRemoveValueFromField(COMMITTED_DOC_ID, "title_s", "title_1");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_2");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicRemoveValueFromField(UNCOMMITTED_DOC_ID, "title_s", "title_1");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_2");
+  }
+
+  @Test
+  public void testAtomicUpdateRemovalOfTextField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicRemoveValueFromField(COMMITTED_DOC_ID, "tv_mv_text", "text_1");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_2");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicRemoveValueFromField(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_2");
+  }
+
+  @Test
+  public void testAtomicUpdateRemovalOfIntField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicRemoveValueFromField(COMMITTED_DOC_ID, "count_is", 1);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicRemoveValueFromField(UNCOMMITTED_DOC_ID, "count_is", 1);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 2);
+  }
+
+  @Test
+  public void testAtomicUpdateRemovalOfDoubleField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicRemoveValueFromField(COMMITTED_DOC_ID, "count_md", 1.0);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 2.0);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicRemoveValueFromField(UNCOMMITTED_DOC_ID, "count_md", 1.0);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 2.0);
+  }
+
+  @Test
+  public void testAtomicUpdateRemovalOfDateField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicRemoveValueFromField(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicRemoveValueFromField(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_2);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDistinctValueOnStrField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "title_s", "title_3");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_1", "title_2", "title_3");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "title_s", "title_3");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_1", "title_2", "title_3");
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDuplicateValueOnStrField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "title_s", "title_2");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "title_s", "title_2");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "title_s", "title_1", "title_2");
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDistinctValueOnTextField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "tv_mv_text", "text_3");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2", "text_3");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_3");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2", "text_3");
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDuplicateValueOnTextField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "tv_mv_text", "text_2");
+    ensureFieldHasValues(COMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_2");
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "tv_mv_text", "text_1", "text_2");
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDistinctValueOnIntField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "count_is", 3);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 1, 2, 3);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "count_is", 3);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2, 3);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "count_is", 3);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 1, 2, 3);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 3);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2, 3);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDuplicateValueOnIntField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "count_is", 2);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_is", 1, 2);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "count_is", 2);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "count_is", 2);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_is", 1, 2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 2);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_is", 1, 2);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDistinctValueOnDoubleField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "count_md", 3.0);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 1.0, 2.0, 3.0);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "count_md", 3.0);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0, 3.0);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "count_md", 3.0);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 1.0, 2.0, 3.0);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 3.0);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0, 3.0);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDuplicateValueOnDoubleField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "count_md", 2.0);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "count_md", 2.0);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "count_md", 2.0);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "count_md", 1.0, 2.0);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 2.0);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "count_md", 1.0, 2.0);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDistinctValueOnDateField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "timestamps_mdt", DATE_3);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2, DATE_3);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_3);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2, DATE_3);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_3);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2, DATE_3);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_3);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2, DATE_3);
+  }
+
+  @Test
+  public void testAtomicUpdateAddDistinctOfDuplicateValueOnDateField() throws Exception {
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_ID, "timestamps_mdt", DATE_2);
+    ensureFieldHasValues(COMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_2);
+    ensureFieldHasValues(COMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_2);
+    ensureFieldHasValues(UNCOMMITTED_DOC_ID, "timestamps_mdt", DATE_1, DATE_2);
+
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+    atomicAddDistinctValueToField(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_2);
+    ensureFieldHasValues(UNCOMMITTED_DOC_STR_VALUES_ID, "timestamps_mdt", DATE_1, DATE_2);
+  }
+
+  private void atomicRemoveValueFromField(String docId, String fieldName, Object value) throws Exception {
+    final SolrInputDocument doc = new SolrInputDocument();
+    doc.setField("id", docId);
+    Map<String, Object> atomicUpdateRemoval = new HashMap<>(1);
+    atomicUpdateRemoval.put("remove", value);
+    doc.setField(fieldName, atomicUpdateRemoval);
+
+    cluster.getSolrClient().add(COLLECTION, doc);
+  }
+
+  private void atomicAddDistinctValueToField(String docId, String fieldName, Object value) throws Exception {
+    final SolrInputDocument doc = new SolrInputDocument();
+    doc.setField("id", docId);
+    Map<String, Object> atomicUpdateRemoval = new HashMap<>(1);
+    atomicUpdateRemoval.put("add-distinct", value);
+    doc.setField(fieldName, atomicUpdateRemoval);
+
+    cluster.getSolrClient().add(COLLECTION, doc);
+  }
+
+  private void ensureFieldHasValues(String identifyingDocId, String fieldName, Object... expectedValues) throws Exception {
+    final ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.set("id", identifyingDocId);
+    QueryRequest request = new QueryRequest(solrParams);
+    request.setPath("/get");
+    final QueryResponse response = request.process(cluster.getSolrClient(), COLLECTION);
+
+    final NamedList<Object> rawResponse = response.getResponse();
+    assertTrue(rawResponse.get("doc") != null);
+    assertTrue(rawResponse.get("doc") instanceof SolrDocument);
+    final SolrDocument doc = (SolrDocument) rawResponse.get("doc");
+    final Collection<Object> valuesAfterUpdate = doc.getFieldValues(fieldName);
+    assertEquals("Expected field to have " + expectedValues.length + " values, but found " + valuesAfterUpdate.size(),
+            expectedValues.length, valuesAfterUpdate.size());
+    for (Object expectedValue: expectedValues) {
+      assertTrue("Expected value [" + expectedValue + "] was not found in field", valuesAfterUpdate.contains(expectedValue));
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java
index 3ce4221..30faefa 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java
@@ -35,7 +35,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest.CreateTimeRou
 import org.apache.solr.client.solrj.response.FieldStatsInfo;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cloud.api.collections.CategoryRoutedAlias;
 import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
 import org.apache.solr.common.SolrDocument;
@@ -101,8 +100,7 @@ public class DimensionalRoutedAliasUpdateProcessorTest extends RoutedAliasUpdate
 
     CollectionAdminRequest.DimensionalRoutedAlias dra = CollectionAdminRequest.createDimensionalRoutedAlias(getAlias(),
         CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
-            .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-            .setMaxShardsPerNode(2), TRA_Dim, CRA_Dim);
+            .setMaxShardsPerNode(2), TRA_Dim,  CRA_Dim);
 
     SolrParams params = dra.getParams();
     assertEquals("Dimensional[TIME,CATEGORY]", params.get(CollectionAdminRequest.RoutedAliasAdminRequest.ROUTER_TYPE_NAME));
@@ -363,7 +361,6 @@ public class DimensionalRoutedAliasUpdateProcessorTest extends RoutedAliasUpdate
 
     CollectionAdminRequest.DimensionalRoutedAlias dra = CollectionAdminRequest.createDimensionalRoutedAlias(getAlias(),
         CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
-            .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
             .setMaxShardsPerNode(2), CRA_Dim, TRA_Dim);
 
     SolrParams params = dra.getParams();
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
index c8ae8b2..6eb122a 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
@@ -88,7 +88,7 @@ public class TemplateUpdateProcessorTest extends SolrCloudTestCase {
     params.add("commit", "true");
     UpdateRequest add = new UpdateRequest().add(solrDoc);
     add.setParams(params);
-    NamedList<Object> result = cluster.getSolrClient().request(CollectionAdminRequest.createCollection("c", "conf1", 1, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE));
+    NamedList<Object> result = cluster.getSolrClient().request(CollectionAdminRequest.createCollection("c", "conf1", 1, 1));
     Utils.toJSONString(result.asMap(4));
     AbstractFullDistribZkTestBase.waitForCollection(cluster.getSolrClient().getZkStateReader(), "c",1);
     cluster.getSolrClient().request(add, "c");
diff --git a/solr/core/src/test/org/apache/solr/util/TestExportTool.java b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
index 6427b48..7b34958 100644
--- a/solr/core/src/test/org/apache/solr/util/TestExportTool.java
+++ b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
@@ -56,7 +56,6 @@ public class TestExportTool extends SolrCloudTestCase {
     try {
       CollectionAdminRequest
           .createCollection(COLLECTION_NAME, "conf", 2, 1)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .setMaxShardsPerNode(100)
           .process(cluster.getSolrClient());
       cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
@@ -132,7 +131,6 @@ public class TestExportTool extends SolrCloudTestCase {
     try {
       CollectionAdminRequest
           .createCollection(COLLECTION_NAME, "conf", 8, 1)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .setMaxShardsPerNode(10)
           .process(cluster.getSolrClient());
       cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8);
diff --git a/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java b/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
index fc5dcb2..23f75ec 100644
--- a/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
+++ b/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
@@ -56,7 +56,6 @@ public class TestDistributedTracing extends SolrCloudTestCase {
     waitForSampleRateUpdated(1.0);
     CollectionAdminRequest
         .createCollection(COLLECTION, "config", 2, 2)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION, 2, 4);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
index f459a65..f811c5a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
@@ -30,7 +30,6 @@ import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -112,12 +111,6 @@ public interface DistribStateManager extends SolrCloseable {
     return tree;
   }
 
-  default PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
-    throw new UnsupportedOperationException("Not implemented");
-
-
-  }
-
   /**
    * Remove data recursively.
    * @param root root path
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 7724b58..5ad7ff4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -187,9 +187,6 @@ public class SolrClientCloudManager implements SolrCloudManager {
       return EMPTY;
     }
   }
-  public SolrZkClient getZkClient() {
-    return zkClient;
-  }
 
   @Override
   public DistributedQueueFactory getDistributedQueueFactory() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
index c1c30de..b15445b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
@@ -30,7 +30,6 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.common.AlreadyClosedException;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.AutoScalingParams;
@@ -208,9 +207,4 @@ public class ZkDistribStateManager implements DistribStateManager {
   public SolrZkClient getZkClient() {
     return zkClient;
   }
-
-  @Override
-  public PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
-    return PerReplicaStates.fetch(path, zkClient, null);
-  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 3ffebc3..432c901 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -56,7 +56,6 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.PER_REPLICA_STATE;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
@@ -95,7 +94,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       COLL_CONF,
       WITH_COLLECTION,
       COLOCATED_WITH,
-      PER_REPLICA_STATE,
       READ_ONLY);
 
   protected final CollectionAction action;
@@ -445,7 +443,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Integer nrtReplicas;
     protected Integer pullReplicas;
     protected Integer tlogReplicas;
-    protected Boolean perReplicaState;
 
     protected Properties properties;
     protected Boolean autoAddReplicas;
@@ -492,7 +489,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
-    public Create setPerReplicaState(Boolean b) {this.perReplicaState = b; return this; }
 
     public Create setAlias(String alias) {
       this.alias = alias;
@@ -511,7 +507,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Boolean getAutoAddReplicas() { return autoAddReplicas; }
     public Integer getNumTlogReplicas() {return tlogReplicas;}
     public Integer getNumPullReplicas() {return pullReplicas;}
-    public Boolean getPerReplicaState() {return perReplicaState;}
 
     public Integer getStateFormat() { return stateFormat; }
 
@@ -593,9 +588,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (tlogReplicas != null) {
         params.set(ZkStateReader.TLOG_REPLICAS, tlogReplicas);
       }
-      if (Boolean.TRUE.equals(perReplicaState)) {
-        params.set(PER_REPLICA_STATE, perReplicaState);
-      }
       if (rule != null) params.set(DocCollection.RULE, rule);
       if (snitch != null) params.set(DocCollection.SNITCH, snitch);
       params.setNonNull(POLICY, policy);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 133025a..1e44912 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -24,19 +23,15 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.noggit.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Immutable state of the cloud. Normally you can get the state by using
@@ -44,8 +39,6 @@ import org.slf4j.LoggerFactory;
  * @lucene.experimental
  */
 public class ClusterState implements JSONWriter.Writable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
 
   private final Integer znodeVersion;
 
@@ -263,14 +256,6 @@ public class ClusterState implements JSONWriter.Writable {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
-    if (Boolean.parseBoolean(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
-      if(log.isDebugEnabled()) {
-        log.debug("a collection {} has per-replica state", name);
-      }
-      //this collection has replica states stored outside
-      ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
-      if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
-    }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(DocCollection.SHARDS);
     if (sliceObjs == null) {
@@ -435,63 +420,5 @@ public class ClusterState implements JSONWriter.Writable {
   public int size() {
     return collectionStates.size();
   }
-  interface ReplicaStatesProvider {
-
-    Optional<ReplicaStatesProvider> get();
-
-    PerReplicaStates getStates();
-
-  }
-
-  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
-    @Override
-    public Optional<ReplicaStatesProvider> get() {
-      return Optional.empty();
-    }
-
-    @Override
-    public PerReplicaStates getStates() {
-      throw new RuntimeException("Invalid operation");
-    }
-
-  };
-
-  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
-
-
-  public static ReplicaStatesProvider getReplicaStatesProvider() {
-    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
-  }
-  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
-    REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
-  }
-
-
-  public static void clearReplicaStateProvider(){
-    REPLICASTATES_PROVIDER.remove();
-  }
-
-  private static class StatesProvider implements ReplicaStatesProvider {
-    private final Supplier<PerReplicaStates> replicaStatesSupplier;
-    private PerReplicaStates perReplicaStates;
-    private boolean isPerReplicaState = false;
-
-    public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
-      this.replicaStatesSupplier = replicaStatesSupplier;
-    }
-
-    @Override
-    public Optional<ReplicaStatesProvider> get() {
-      return isPerReplicaState ? Optional.of(this) : Optional.empty();
-    }
-
-    @Override
-    public PerReplicaStates getStates() {
-      if (perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
-      return perReplicaStates;
-    }
-
-  }
-
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index b72256e..38bbeae 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -34,8 +33,6 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.noggit.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@@ -50,12 +47,9 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
  */
 public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
 
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
-  public static final String PER_REPLICA_STATE = "perReplicaState";
   public static final String STATE_FORMAT = "stateFormat";
   /** @deprecated to be removed in Solr 9.0 (see SOLR-14930)
    *
@@ -86,10 +80,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Boolean autoAddReplicas;
   private final String policy;
   private final Boolean readOnly;
-  private final Boolean perReplicaState;
-  private final Map<String, Replica> replicaMap = new HashMap<>();
-  private volatile PerReplicaStates perReplicaStates;
-
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
     this(name, slices, props, router, Integer.MAX_VALUE, ZkStateReader.CLUSTER_STATE);
@@ -115,10 +105,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
     this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
     this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
-    this.perReplicaState = (Boolean) verifyProp(props, PER_REPLICA_STATE, Boolean.FALSE);
     Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
     this.policy = (String) props.get(Policy.POLICY);
-    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> perReplicaStates = it.getStates());
     this.autoAddReplicas = autoAddReplicas == null ? Boolean.FALSE : autoAddReplicas;
     Boolean readOnly = (Boolean) verifyProp(props, READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
@@ -134,9 +122,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       }
       for (Replica replica : slice.getValue()) {
         addNodeNameReplica(replica);
-        if (perReplicaState) {
-          replicaMap.put(replica.getName(), replica);
-        }
       }
     }
     this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
@@ -145,31 +130,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     assert name != null && slices != null;
   }
 
-  /**Update our state with a state of a {@link Replica}
-   * Used to create a new Collection State when only a replica is updated
-   */
-  public DocCollection copyWith( PerReplicaStates newPerReplicaStates) {
-    log.debug("collection :{} going to be updated :  per-replica state :{} -> {}",
-        name,
-        getChildNodesVersion(), newPerReplicaStates.cversion);
-    if (getChildNodesVersion() == newPerReplicaStates.cversion) return this;
-    Set<String> modifiedReplicas = PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
-    if (modifiedReplicas.isEmpty()) return this; //nothing is modified
-    Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
-    for (String s : modifiedReplicas) {
-      Replica replica = getReplica(s);
-      if (replica != null) {
-        Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
-        Slice shard = modifiedShards.get(replica.slice);
-        modifiedShards.put(replica.slice, shard.copyWith(newReplica));
-      }
-    }
-    DocCollection result = new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion, znode);
-    result.perReplicaStates = newPerReplicaStates;
-    return result;
-
-  }
-
   private void addNodeNameReplica(Replica replica) {
     List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
     if (replicas == null) {
@@ -202,7 +162,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       case PULL_REPLICAS:
       case TLOG_REPLICAS:
         return Integer.parseInt(o.toString());
-      case PER_REPLICA_STATE:
       case AUTO_ADD_REPLICAS:
       case READ_ONLY:
         return Boolean.parseBoolean(o.toString());
@@ -219,10 +178,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @param slices the new set of Slices
    * @return the resulting DocCollection
    */
-  public DocCollection copyWithSlices(Map<String, Slice> slices) {
-    DocCollection result = new DocCollection(getName(), slices, propMap, router, znodeVersion, znode);
-    result.perReplicaStates = perReplicaStates;
-    return result;
+  public DocCollection copyWithSlices(Map<String, Slice> slices){
+    return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
   }
 
   /**
@@ -296,16 +253,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public int getZNodeVersion(){
     return znodeVersion;
   }
-  public int getChildNodesVersion() {
-    return perReplicaStates == null ? -1 : perReplicaStates.cversion;
-  }
-
-  public boolean isModified(int dataVersion, int childVersion) {
-    if (dataVersion > znodeVersion) return true;
-    if (childVersion > getChildNodesVersion()) return true;
-    return false;
-
-  }
 
   public int getStateFormat() {
     return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2;
@@ -345,9 +292,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public String toString() {
-    return "DocCollection("+name+"/" + znode + "/" + znodeVersion+" "
-        + (perReplicaStates == null ? "": perReplicaStates.toString())+")="
-        + toJSONString(this);
+    return "DocCollection("+name+"/" + znode + "/" + znodeVersion + ")=" + toJSONString(this);
   }
 
   @Override
@@ -359,9 +304,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   }
 
   public Replica getReplica(String coreNodeName) {
-    if (perReplicaState) {
-      return replicaMap.get(coreNodeName);
-    }
     for (Slice slice : slices.values()) {
       Replica replica = slice.getReplica(coreNodeName);
       if (replica != null) return replica;
@@ -490,15 +432,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return policy;
   }
 
-  public boolean isPerReplicaState() {
-    return Boolean.TRUE.equals(perReplicaState);
-  }
-
-  public PerReplicaStates getPerReplicaStates() {
-    return perReplicaStates;
-  }
-
-
   public int getExpectedReplicaCount(Replica.Type type, int def) {
     Integer result = null;
     if (type == Replica.Type.NRT) result = numNrtReplicas;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
deleted file mode 100644
index 4de1a1a..0000000
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.common.cloud;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import org.apache.solr.cluster.api.SimpleMap;
-import org.apache.solr.common.MapWriter;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.annotation.JsonProperty;
-import org.apache.solr.common.util.ReflectMapWriter;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.WrappedSimpleMap;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.params.CommonParams.VERSION;
-
-/**
- * This represents the individual replica states in a collection
- * This is an immutable object. When states are modified, a new instance is constructed
- */
-public class PerReplicaStates implements ReflectMapWriter {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final char SEPARATOR = ':';
-  //no:of times to retry in case of a CAS failure
-  public static final int MAX_RETRIES = 5;
-
-
-  @JsonProperty
-  public final String path;
-
-  @JsonProperty
-  public final int cversion;
-
-  @JsonProperty
-  public final SimpleMap<State> states;
-
-  /**
-   * Construct with data read from ZK
-   * @param path path from where this is loaded
-   * @param cversion the current child version of the znode
-   * @param states the per-replica states (the list of all child nodes)
-   */
-  public PerReplicaStates(String path, int cversion, List<String> states) {
-    this.path = path;
-    this.cversion = cversion;
-    Map<String, State> tmp = new LinkedHashMap<>();
-
-    for (String state : states) {
-      State rs = State.parse(state);
-      if (rs == null) continue;
-      State existing = tmp.get(rs.replica);
-      if (existing == null) {
-        tmp.put(rs.replica, rs);
-      } else {
-        tmp.put(rs.replica, rs.insert(existing));
-      }
-    }
-    this.states = new WrappedSimpleMap<>(tmp);
-
-  }
-
-  /**Get the changed replicas
-   */
-  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
-    Set<String> result = new HashSet<>();
-    if (fresh == null) {
-      old.states.forEachKey(result::add);
-      return result;
-    }
-    old.states.forEachEntry((s, state) -> {
-      // the state is modified or missing
-      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
-    });
-    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
-    });
-    return result;
-  }
-
-
-  /**
-   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
-   * If this is not modified, the same object is returned
-   */
-  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
-    try {
-      if (current != null) {
-        Stat stat = zkClient.exists(current.path, null, true);
-        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
-        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
-      }
-      Stat stat = new Stat();
-      List<String> children = zkClient.getChildren(path, null, stat, true);
-      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
-    } catch (KeeperException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
-    } catch (InterruptedException e) {
-      SolrZkClient.checkInterrupted(e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
-    }
-  }
-
-
-  public static String getReplicaName(String s) {
-    int idx = s.indexOf(SEPARATOR);
-    if (idx > 0) {
-      return s.substring(0, idx);
-    }
-    return null;
-  }
-
-  public State get(String replica) {
-    return states.get(replica);
-  }
-
-  public static class Operation {
-    public final Type typ;
-    public final State state;
-
-    public Operation(Type typ, State replicaState) {
-      this.typ = typ;
-      this.state = replicaState;
-    }
-
-
-    public enum Type {
-      //add a new node
-      ADD,
-      //delete an existing node
-      DELETE
-    }
-
-    @Override
-    public String toString() {
-      return typ.toString() + " : " + state;
-    }
-  }
-
-
-  /**
-   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
-   */
-  public static class State implements MapWriter {
-
-    public final String replica;
-
-    public final Replica.State state;
-
-    public final Boolean isLeader;
-
-    public final int version;
-
-    public final String asString;
-
-    /**
-     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
-     * <p>
-     * the entry with '13' is the latest and the one with '12' is considered a duplicate
-     * <p>
-     * These are unlikely, but possible
-     */
-    final State duplicate;
-
-    private State(String serialized, List<String> pieces) {
-      this.asString = serialized;
-      replica = pieces.get(0);
-      version = Integer.parseInt(pieces.get(1));
-      String encodedStatus = pieces.get(2);
-      this.state = Replica.getState(encodedStatus);
-      isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
-      duplicate = null;
-    }
-
-    public static State parse(String serialized) {
-      List<String> pieces = StrUtils.splitSmart(serialized, ':');
-      if (pieces.size() < 3) return null;
-      return new State(serialized, pieces);
-
-    }
-
-    public State(String replica, Replica.State state, Boolean isLeader, int version) {
-      this(replica, state, isLeader, version, null);
-    }
-
-    public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
-      this.replica = replica;
-      this.state = state == null ? Replica.State.ACTIVE : state;
-      this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
-      this.version = version;
-      asString = serialize();
-      this.duplicate = duplicate;
-    }
-
-    @Override
-    public void writeMap(EntryWriter ew) throws IOException {
-      ew.put(NAME, replica);
-      ew.put(VERSION, version);
-      ew.put(ZkStateReader.STATE_PROP, state.toString());
-      if (isLeader) ew.put(Slice.LEADER, isLeader);
-      ew.putIfNotNull("duplicate", duplicate);
-    }
-
-    private State insert(State duplicate) {
-      assert this.replica.equals(duplicate.replica);
-      if (this.version >= duplicate.version) {
-        if (this.duplicate != null) {
-          duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
-        }
-        return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
-      } else {
-        return duplicate.insert(this);
-      }
-    }
-
-    /**
-     * fetch duplicates entries for this replica
-     */
-    List<State> getDuplicates() {
-      if (duplicate == null) return Collections.emptyList();
-      List<State> result = new ArrayList<>();
-      State current = duplicate;
-      while (current != null) {
-        result.add(current);
-        current = current.duplicate;
-      }
-      return result;
-    }
-
-    private String serialize() {
-      StringBuilder sb = new StringBuilder(replica)
-          .append(":")
-          .append(version)
-          .append(":")
-          .append(state.shortName);
-      if (isLeader) sb.append(":").append("L");
-      return sb.toString();
-    }
-
-
-    @Override
-    public String toString() {
-      return asString;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof State) {
-        State that = (State) o;
-        return Objects.equals(this.asString, that.asString);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return asString.hashCode();
-    }
-  }
-
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
-    appendStates(sb);
-    return sb.append("]}").toString();
-  }
-
-  private StringBuilder appendStates(StringBuilder sb) {
-    states.forEachEntry(new BiConsumer<String, State>() {
-      int count = 0;
-      @Override
-      public void accept(String s, State state) {
-        if (count++ > 0) sb.append(", ");
-        sb.append(state.asString);
-        for (State d : state.getDuplicates()) sb.append(d.asString);
-      }
-    });
-    return sb;
-  }
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
deleted file mode 100644
index b028673..0000000
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.common.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Function;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Collections.singletonList;
-
-/**
- * This is a helper class that encapsulates various operations performed on the per-replica states
- * Do not directly manipulate the per replica states as it can become difficult to debug them
- */
-public class PerReplicaStatesOps {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private PerReplicaStates rs;
-  List<PerReplicaStates.Operation> ops;
-  private boolean preOp = true;
-  final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
-
-  PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
-    this.fun = fun;
-  }
-
-  /**
-   * Persist a set of operations to Zookeeper
-   */
-  private void persist(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    if (operations == null || operations.isEmpty()) return;
-    if (log.isDebugEnabled()) {
-      log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
-    }
-
-    List<Op> ops = new ArrayList<>(operations.size());
-    for (PerReplicaStates.Operation op : operations) {
-      //the state of the replica is being updated
-      String path = znode + "/" + op.state.asString;
-      ops.add(op.typ == PerReplicaStates.Operation.Type.ADD ?
-          Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
-          Op.delete(path, -1));
-    }
-    try {
-      zkClient.multi(ops, true);
-    } catch (KeeperException e) {
-      log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
-      throw e;
-    }
-
-  }
-
-  static List<PerReplicaStates.Operation> addDeleteStaleNodes(List<PerReplicaStates.Operation> ops, PerReplicaStates.State rs) {
-    while (rs != null) {
-      ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, rs));
-      rs = rs.duplicate;
-    }
-    return ops;
-  }
-
-  /**
-   * This is a persist operation with retry if a write fails due to stale state
-   */
-  public void persist(String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    List<PerReplicaStates.Operation> operations = ops;
-    for (int i = 0; i < PerReplicaStates.MAX_RETRIES; i++) {
-      try {
-        persist(operations, znode, zkClient);
-        return;
-      } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
-        //state is stale
-        if(log.isInfoEnabled()) {
-          log.info("stale state for {} , attempt: {}. retrying...", znode, i);
-        }
-        operations = refresh(PerReplicaStates.fetch(znode, zkClient, null));
-      }
-    }
-  }
-
-  public PerReplicaStates getPerReplicaStates() {
-    return rs;
-  }
-
-  /**
-   * state of a replica is changed
-   *
-   * @param newState the new state
-   */
-  public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(prs -> {
-      List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
-      PerReplicaStates.State existing = rs.get(replica);
-      if (existing == null) {
-        operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
-      } else {
-        operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, existing.isLeader, existing.version + 1)));
-        addDeleteStaleNodes(operations, existing);
-      }
-      if (log.isDebugEnabled()) {
-        log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
-      }
-      return operations;
-    }).init(rs);
-  }
-
-  /**
-   * Switch a collection from/to perReplicaState=true
-   */
-  public static PerReplicaStatesOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(prs -> enable ? enable(coll) : disable(prs)).init(rs);
-
-  }
-
-  private static List<PerReplicaStates.Operation> enable(DocCollection coll) {
-    List<PerReplicaStates.Operation> result = new ArrayList<>();
-    coll.forEachReplica((s, r) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(r.getName(), r.getState(), r.isLeader(), 0))));
-    return result;
-  }
-
-  private static List<PerReplicaStates.Operation> disable(PerReplicaStates prs) {
-    List<PerReplicaStates.Operation> result = new ArrayList<>();
-    prs.states.forEachEntry((s, state) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, state)));
-    return result;
-  }
-
-  /**
-   * Flip the leader replica to a new one
-   *
-   * @param allReplicas allReplicas of the shard
-   * @param next        next leader
-   */
-  public static PerReplicaStatesOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(prs -> {
-      List<PerReplicaStates.Operation> ops = new ArrayList<>();
-      if (next != null) {
-        PerReplicaStates.State st = rs.get(next);
-        if (st != null) {
-          if (!st.isLeader) {
-            ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
-            ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
-          }
-          //else do not do anything , that node is the leader
-        } else {
-          //there is no entry for the new leader.
-          //create one
-          ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
-        }
-      }
-
-      //now go through all other replicas and unset previous leader
-      for (String r : allReplicas) {
-        PerReplicaStates.State st = rs.get(r);
-        if (st == null) continue;//unlikely
-        if (!Objects.equals(r, next)) {
-          if (st.isLeader) {
-            //some other replica is the leader now. unset
-            ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
-            ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
-          }
-        }
-      }
-      if (log.isDebugEnabled()) {
-        log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
-      }
-      return ops;
-    }).init(rs);
-  }
-
-  /**
-   * Delete a replica entry from per-replica states
-   *
-   * @param replica name of the replica to be deleted
-   */
-  public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(prs -> {
-      List<PerReplicaStates.Operation> result;
-      if (rs == null) {
-        result = Collections.emptyList();
-      } else {
-        PerReplicaStates.State state = rs.get(replica);
-        result = addDeleteStaleNodes(new ArrayList<>(), state);
-      }
-      return result;
-    }).init(rs);
-  }
-
-  public static PerReplicaStatesOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(perReplicaStates -> singletonList(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD,
-        new PerReplicaStates.State(replica, state, isLeader, 0)))).init(rs);
-  }
-
-  /**
-   * mark a bunch of replicas as DOWN
-   */
-  public static PerReplicaStatesOps downReplicas(List<String> replicas, PerReplicaStates rs) {
-    return new PerReplicaStatesOps(prs -> {
-      List<PerReplicaStates.Operation> operations = new ArrayList<>();
-      for (String replica : replicas) {
-        PerReplicaStates.State r = rs.get(replica);
-        if (r != null) {
-          if (r.state == Replica.State.DOWN && !r.isLeader) continue;
-          operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
-          addDeleteStaleNodes(operations, r);
-        } else {
-          operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
-        }
-      }
-      if (log.isDebugEnabled()) {
-        log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
-      }
-      return operations;
-    }).init(rs);
-  }
-
-  /**
-   * Just creates and deletes a dummy entry so that the {@link Stat#getCversion()} of states.json
-   * is updated
-   */
-  public static PerReplicaStatesOps touchChildren() {
-    PerReplicaStatesOps result = new PerReplicaStatesOps(prs -> {
-      List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
-      PerReplicaStates.State st = new PerReplicaStates.State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
-      operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, st));
-      operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
-      if (log.isDebugEnabled()) {
-        log.debug("touchChildren {}", operations);
-      }
-      return operations;
-    });
-    result.preOp = false;
-    result.ops = result.refresh(null);
-    return result;
-  }
-
-  PerReplicaStatesOps init(PerReplicaStates rs) {
-    if (rs == null) return null;
-    get(rs);
-    return this;
-  }
-
-  public List<PerReplicaStates.Operation> get() {
-    return ops;
-  }
-
-  public List<PerReplicaStates.Operation> get(PerReplicaStates rs) {
-    ops = refresh(rs);
-    if (ops == null) ops = Collections.emptyList();
-    this.rs = rs;
-    return ops;
-  }
-
-  /**
-   * To be executed before collection state.json is persisted
-   */
-  public boolean isPreOp() {
-    return preOp;
-  }
-
-  /**
-   * This method should compute the set of ZK operations for a given action
-   * for instance, a state change may result in 2 operations on per-replica states (1 CREATE and 1 DELETE)
-   * if a multi operation fails because the state got modified from behind,
-   * refresh the operation and try again
-   *
-   * @param prs The latest state
-   */
-  List<PerReplicaStates.Operation> refresh(PerReplicaStates prs) {
-    if (fun != null) return fun.apply(prs);
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return ops.toString();
-  }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index effad3e..eef3f89 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -16,9 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
@@ -26,11 +23,9 @@ import java.util.Set;
 
 import org.apache.solr.common.util.Utils;
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Replica extends ZkNodeProps {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
   /**
    * The replica's state. In general, if the node the replica is hosted on is
    * not under {@code /live_nodes} in ZK, the replica's state should be
@@ -48,7 +43,7 @@ public class Replica extends ZkNodeProps {
      * {@link ClusterState#liveNodesContain(String)}).
      * </p>
      */
-    ACTIVE("A"),
+    ACTIVE,
     
     /**
      * The first state before {@link State#RECOVERING}. A node in this state
@@ -59,13 +54,13 @@ public class Replica extends ZkNodeProps {
      * should not be relied on.
      * </p>
      */
-    DOWN("D"),
+    DOWN,
     
     /**
      * The node is recovering from the leader. This might involve peer-sync,
      * full replication or finding out things are already in sync.
      */
-    RECOVERING("R"),
+    RECOVERING,
     
     /**
      * Recovery attempts have not worked, something is not right.
@@ -75,16 +70,8 @@ public class Replica extends ZkNodeProps {
      * cluster and it's state should be discarded.
      * </p>
      */
-    RECOVERY_FAILED("F");
-
-    /**short name for a state. Used to encode this in the state node see {@link PerReplicaStates.State}
-     */
-    public final String shortName;
-
-    State(String c) {
-      this.shortName = c;
-    }
-
+    RECOVERY_FAILED;
+    
     @Override
     public String toString() {
       return super.toString().toLowerCase(Locale.ROOT);
@@ -92,7 +79,7 @@ public class Replica extends ZkNodeProps {
     
     /** Converts the state string to a State instance. */
     public static State getState(String stateStr) {
-      return stateStr == null ? null : Replica.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+      return stateStr == null ? null : State.valueOf(stateStr.toUpperCase(Locale.ROOT));
     }
   }
 
@@ -127,7 +114,6 @@ public class Replica extends ZkNodeProps {
   private final State state;
   private final Type type;
   public final String slice, collection;
-  private PerReplicaStates.State replicaState;
 
   public Replica(String name, Map<String,Object> propMap, String collection, String slice) {
     super(propMap);
@@ -143,23 +129,11 @@ public class Replica extends ZkNodeProps {
     Objects.requireNonNull(this.nodeName, "'node_name' must not be null");
     Objects.requireNonNull(this.core, "'core' must not be null");
     Objects.requireNonNull(this.type, "'type' must not be null");
-    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
-      log.debug("A replica  {} state fetched from per-replica state", name);
-      replicaState = it.getStates().get(name);
-      if (replicaState!= null) {
-        propMap.put(ZkStateReader.STATE_PROP, replicaState.state.toString().toLowerCase(Locale.ROOT));
-        if (replicaState.isLeader) propMap.put(Slice.LEADER, "true");
-      }
-    }) ;
-    if (replicaState == null) {
-      if (propMap.get(ZkStateReader.STATE_PROP) != null) {
-        this.state = Replica.State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
-      } else {
-        this.state = Replica.State.ACTIVE;                         //Default to ACTIVE
-        propMap.put(ZkStateReader.STATE_PROP, state.toString());
-      }
+    if (propMap.get(ZkStateReader.STATE_PROP) != null) {
+      this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
     } else {
-      this.state = replicaState.state;
+      this.state = State.ACTIVE;                         //Default to ACTIVE
+      propMap.put(ZkStateReader.STATE_PROP, state.toString());
     }
     propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.nodeName));
   }
@@ -216,7 +190,7 @@ public class Replica extends ZkNodeProps {
   }
 
   public boolean isActive(Set<String> liveNodes) {
-    return this.nodeName != null && liveNodes.contains(this.nodeName) && this.state == Replica.State.ACTIVE;
+    return this.nodeName != null && liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
   }
   
   public Type getType() {
@@ -234,40 +208,6 @@ public class Replica extends ZkNodeProps {
     return propertyValue;
   }
 
-  public Replica copyWith(PerReplicaStates.State state) {
-    log.debug("A replica is updated with new state : {}", state);
-    Map<String, Object> props = new LinkedHashMap<>(propMap);
-    if (state == null) {
-      props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
-      props.remove(Slice.LEADER);
-    } else {
-      props.put(ZkStateReader.STATE_PROP, state.state.toString());
-      if (state.isLeader) props.put(Slice.LEADER, "true");
-    }
-    Replica r = new Replica(name, props, collection, slice);
-    r.replicaState = state;
-    return r;
-  }
-
-  public PerReplicaStates.State getReplicaState() {
-    return replicaState;
-  }
-
-  private static final Map<String, State> STATES = new HashMap<>();
-  static {
-    STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
-    STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
-    STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
-    STATES.put(Replica.State.RECOVERY_FAILED.shortName, Replica.State.RECOVERY_FAILED);
-  }
-  public static State getState(String c) {
-  return STATES.get(c);
-  }
-
-  public boolean isLeader() {
-    if (replicaState != null) return replicaState.isLeader;
-     return getStr(Slice.LEADER) != null;
-  }
   @Override
   public String toString() {
     return name + ':' + Utils.toJSONString(propMap); // small enough, keep it on one line (i.e. no indent)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 802299a..4378ef7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -26,14 +25,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.solr.common.cloud.Replica.Type;
 import org.noggit.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.util.Utils.toJSONString;
 
@@ -41,8 +37,6 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
  */
 public class Slice extends ZkNodeProps implements Iterable<Replica> {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   public final String collection;
 
   /** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
@@ -67,14 +61,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values().iterator();
   }
 
-  /**Make a copy with a modified replica
-   */
-  public Slice copyWith(Replica modified) {
-    log.debug("modified replica : {}", modified);
-    Map<String, Replica> replicasCopy = new LinkedHashMap<>(replicas);
-    replicasCopy.put(modified.getName(), modified);
-    return new Slice(name, replicasCopy, propMap, collection);
-  }
   /** The slice's state. */
   public enum State {
 
@@ -121,7 +107,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
     /** Converts the state string to a State instance. */
     public static State getState(String stateStr) {
-      return Slice.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+      return State.valueOf(stateStr.toUpperCase(Locale.ROOT));
     }
   }
 
@@ -152,9 +138,9 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
     Object rangeObj = propMap.get(RANGE);
     if (propMap.get(ZkStateReader.STATE_PROP) != null) {
-      this.state = Slice.State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
+      this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
     } else {
-      this.state = Slice.State.ACTIVE;                         //Default to ACTIVE
+      this.state = State.ACTIVE;                         //Default to ACTIVE
       propMap.put(ZkStateReader.STATE_PROP, state.toString());
     }
     DocRouter.Range tmpRange = null;
@@ -224,7 +210,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   private Replica findLeader() {
     for (Replica replica : replicas.values()) {
-      if (replica.isLeader()) {
+      if (replica.getStr(LEADER) != null) {
         assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT: "Pull replica should not become leader!";
         return replica;
       }
@@ -249,10 +235,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values();
   }
 
-  public Set<String> getReplicaNames() {
-    return Collections.unmodifiableSet(replicas.keySet());
-  }
-
   /**
    * Gets all replicas that match a predicate
    */
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 6943fc5..9b1101e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -331,18 +331,6 @@ public class SolrZkClient implements Closeable {
   }
 
   /**
-   * Returns children of the node at the path
-   */
-  public List<String> getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
-      throws KeeperException, InterruptedException {
-    if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
-    } else {
-      return keeper.getChildren(path, wrapWatcher(watcher), stat);
-    }
-  }
-
-  /**
    * Returns node's data
    */
   public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index e057bab..fef21b6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -396,7 +396,7 @@ public class ZkStateReader implements SolrCloseable {
       Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
       Set<String> updatedCollections = new HashSet<>();
       for (String coll : safeCopy) {
-        DocCollection newState = fetchCollectionState(coll, null, null);
+        DocCollection newState = fetchCollectionState(coll, null);
         if (updateWatchedCollection(coll, newState)) {
           updatedCollections.add(coll);
         }
@@ -444,7 +444,7 @@ public class ZkStateReader implements SolrCloseable {
       } else if (watchedCollectionStates.containsKey(collection)) {
         // Exists as a watched collection, force a refresh.
         log.debug("Forcing refresh of watched collection state for {}", collection);
-        DocCollection newState = fetchCollectionState(collection, null, null);
+        DocCollection newState = fetchCollectionState(collection, null);
         if (updateWatchedCollection(collection, newState)) {
           constructState(Collections.singleton(collection));
         }
@@ -774,7 +774,7 @@ public class ZkStateReader implements SolrCloseable {
 
   private class LazyCollectionRef extends ClusterState.CollectionRef {
     private final String collName;
-    private volatile long lastUpdateTime;
+    private long lastUpdateTime;
     private DocCollection cachedDocCollection;
 
     public LazyCollectionRef(String collName) {
@@ -789,12 +789,12 @@ public class ZkStateReader implements SolrCloseable {
       if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
         boolean shouldFetch = true;
         if (cachedDocCollection != null) {
-          Stat freshStats = null;
+          Stat exists = null;
           try {
-            freshStats = zkClient.exists(getCollectionPath(collName), null, true);
+            exists = zkClient.exists(getCollectionPath(collName), null, true);
           } catch (Exception e) {
           }
-          if (freshStats != null && !cachedDocCollection.isModified(freshStats.getVersion(), freshStats.getCversion())) {
+          if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
             shouldFetch = false;
           }
         }
@@ -972,16 +972,14 @@ public class ZkStateReader implements SolrCloseable {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
-    AtomicReference<DocCollection> coll = new AtomicReference<>();
+
     AtomicReference<Replica> leader = new AtomicReference<>();
     try {
       waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
         if (c == null)
           return false;
-        coll.set(c);
         Replica l = getLeader(n, c, shard);
         if (l != null) {
-          log.debug("leader found for {}/{} to be {}", collection, shard, l);
           leader.set(l);
           return true;
         }
@@ -1316,11 +1314,9 @@ public class ZkStateReader implements SolrCloseable {
    */
   class StateWatcher implements Watcher {
     private final String coll;
-    private final String collectionPath;
 
     StateWatcher(String coll) {
       this.coll = coll;
-      collectionPath = getCollectionPath(coll);
     }
 
     @Override
@@ -1342,29 +1338,18 @@ public class ZkStateReader implements SolrCloseable {
             event, coll, liveNodes.size());
       }
 
-      refreshAndWatch(event.getType());
+      refreshAndWatch();
 
     }
-    public void refreshAndWatch() {
-      refreshAndWatch(null);
-    }
 
     /**
      * Refresh collection state from ZK and leave a watch for future changes.
      * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
      * with the results of the refresh.
      */
-    public void refreshAndWatch(EventType eventType) {
+    public void refreshAndWatch() {
       try {
-        if (eventType == null || eventType == EventType.NodeChildrenChanged) {
-          refreshAndWatchChildren();
-          if (eventType == EventType.NodeChildrenChanged) {
-            //only per-replica states modified. return
-            return;
-          }
-        }
-
-        DocCollection newState = fetchCollectionState(coll, this, collectionPath);
+        DocCollection newState = fetchCollectionState(coll, this);
         updateWatchedCollection(coll, newState);
         synchronized (getUpdateLock()) {
           constructState(Collections.singleton(coll));
@@ -1380,29 +1365,6 @@ public class ZkStateReader implements SolrCloseable {
         log.error("Unwatched collection: [{}]", coll, e);
       }
     }
-
-    private void refreshAndWatchChildren() throws KeeperException, InterruptedException {
-      Stat stat = new Stat();
-      List<String> replicaStates = null;
-      try {
-        replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
-        PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
-        DocCollection oldState = watchedCollectionStates.get(coll);
-        final DocCollection newState = oldState != null ?
-            oldState.copyWith(newStates) :
-            fetchCollectionState(coll, null, collectionPath);
-        updateWatchedCollection(coll, newState);
-        synchronized (getUpdateLock()) {
-          constructState(Collections.singleton(coll));
-        }
-        if (log.isDebugEnabled()) {
-          log.debug("updated per-replica states changed for: {}, ver: {} , new vals: {}", coll, stat.getCversion(), replicaStates);
-        }
-
-      } catch (NoNodeException e) {
-        log.info("{} is deleted, stop watching children", collectionPath);
-      }
-    }
   }
 
   /**
@@ -1605,7 +1567,7 @@ public class ZkStateReader implements SolrCloseable {
 
   public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
     try {
-      return zkStateReader.fetchCollectionState(coll, null, null);
+      return zkStateReader.fetchCollectionState(coll, null);
     } catch (KeeperException e) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
     } catch (InterruptedException e) {
@@ -1614,24 +1576,14 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  public DocCollection fetchCollectionState(String coll, Watcher watcher, String path) throws KeeperException, InterruptedException {
-    String collectionPath = path == null ? getCollectionPath(coll) : path;
+  private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
+    String collectionPath = getCollectionPath(coll);
     while (true) {
-      ClusterState.initReplicaStateProvider(() -> {
-        try {
-          PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
-          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
-          return replicaStates;
-        } catch (Exception e) {
-          //TODO
-          throw new RuntimeException(e);
-        }
-      });
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
         ClusterState state = ClusterState.load(stat.getVersion(), data,
-            Collections.emptySet(), collectionPath);
+            Collections.<String>emptySet(), collectionPath);
         ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
         return collectionRef == null ? null : collectionRef.get();
       } catch (KeeperException.NoNodeException e) {
@@ -1645,8 +1597,6 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         return null;
-      } finally {
-        ClusterState.clearReplicaStateProvider();
       }
     }
   }
@@ -1765,7 +1715,6 @@ public class ZkStateReader implements SolrCloseable {
         v = new CollectionWatch<>();
         watchSet.set(true);
       }
-      log.info("already watching , added to stateWatchers");
       v.stateWatchers.add(stateWatcher);
       return v;
     });
@@ -1775,26 +1724,11 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
-    state = updatePerReplicaState(state);
     if (stateWatcher.onStateChanged(state) == true) {
       removeDocCollectionWatcher(collection, stateWatcher);
     }
   }
 
-  private DocCollection updatePerReplicaState(DocCollection c) {
-    if (c == null || !c.isPerReplicaState()) return c;
-    PerReplicaStates current = c.getPerReplicaStates();
-    PerReplicaStates newPrs = PerReplicaStates.fetch(c.getZNode(), zkClient, current);
-    if (newPrs != current) {
-      log.debug("just-in-time update for a fresh per-replica-state {}", c.getName());
-      DocCollection modifiedColl = c.copyWith(newPrs);
-      updateWatchedCollection(c.getName(), modifiedColl);
-      return modifiedColl;
-    } else {
-      return c;
-    }
-  }
-
   /**
    * Block until a CollectionStatePredicate returns true, or the wait times out
    *
@@ -1831,12 +1765,6 @@ public class ZkStateReader implements SolrCloseable {
     CollectionStateWatcher watcher = (n, c) -> {
       docCollection.set(c);
       boolean matches = predicate.matches(n, c);
-      if (!matches) {
-        if (log.isDebugEnabled()) {
-          log.debug(" CollectionStatePredicate failed for {}, cversion : {}", collection,
-                  (c == null || c.getPerReplicaStates() == null ? "-1" : c.getPerReplicaStates()));
-        }
-      }
       if (matches)
         latch.countDown();
 
@@ -2034,9 +1962,7 @@ public class ZkStateReader implements SolrCloseable {
           break;
         }
       } else {
-        int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
-        int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
-        if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) {
+        if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
           // no change to state, but we might have been triggered by the addition of a
           // state watcher, so run notifications
           updated = true;
@@ -2431,7 +2357,6 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   public DocCollection getCollection(String collection) {
-    return clusterState == null ? null : clusterState.getCollectionOrNull(collection);
+    return clusterState.getCollectionOrNull(collection);
   }
-
 }
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
index 566819b..9d22119 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
@@ -70,9 +70,7 @@ public class IndexingNestedDocuments extends SolrCloudTestCase {
    */
   public void testIndexingAnonKids() throws Exception {
     final String collection = "test_anon";
-    CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1).process(cluster.getSolrClient());
     cluster.getSolrClient().setDefaultCollection(collection);
     
     //
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
index 3915a00..4698c7e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
@@ -57,9 +57,7 @@ public class JsonRequestApiHeatmapFacetingTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
 
     indexSpatialData();
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
index 51679be..86a0d8f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
@@ -64,8 +64,7 @@ public class JsonRequestApiTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
index acb6d3e..04776cc 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
@@ -69,7 +69,6 @@ public class UsingSolrJRefGuideExamplesTest extends SolrCloudTestCase {
         .configure();
 
     CollectionAdminResponse response = CollectionAdminRequest.createCollection("techproducts", "conf", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("techproducts", 1, 1);
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 25582b6..9e38a12 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -60,10 +60,8 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -73,7 +71,6 @@ import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.handler.admin.CollectionsHandler;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.handler.admin.CoreAdminHandler;
-import org.apache.solr.util.LogLevel;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -88,11 +85,10 @@ import org.slf4j.LoggerFactory;
  * This test would be faster if we simulated the zk state instead.
  */
 @Slow
-@LogLevel("org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
 public class CloudSolrClientTest extends SolrCloudTestCase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+  
   private static final String COLLECTION = "collection1";
   private static final String COLLECTION2 = "2nd_collection";
 
@@ -115,8 +111,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     httpBasedCloudSolrClient = new CloudSolrClient.Builder(solrUrls).build();
   }
 
-
-  @After
+  
+  @After 
   public void tearDown() throws Exception {
     if (httpBasedCloudSolrClient != null) {
       try {
@@ -125,7 +121,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
         throw new RuntimeException(e);
       }
     }
-
+    
     shutdownCluster();
     super.tearDown();
   }
@@ -139,17 +135,15 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
    * Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
    */
   private CloudSolrClient getRandomClient() {
-    return random().nextBoolean() ? cluster.getSolrClient() : httpBasedCloudSolrClient;
+    return random().nextBoolean()? cluster.getSolrClient(): httpBasedCloudSolrClient;
   }
 
   @Test
   public void testParallelUpdateQTime() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION, 2, 2);
     UpdateRequest req = new UpdateRequest();
-    for (int i = 0; i < 10; i++) {
+    for (int i=0; i<10; i++)  {
       SolrInputDocument doc = new SolrInputDocument();
       doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
       req.add(doc);
@@ -163,10 +157,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
   public void testOverwriteOption() throws Exception {
 
     CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
         .processAndWait(cluster.getSolrClient(), TIMEOUT);
     cluster.waitForActiveCollection("overwrite", 1, 1);
-
+    
     new UpdateRequest()
         .add("id", "0", "a_t", "hello1")
         .add("id", "0", "a_t", "hello2")
@@ -179,7 +172,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
         .add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
         .add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
         .commit(cluster.getSolrClient(), "overwrite");
-
+      
     resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
     assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
 
@@ -187,14 +180,10 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
   @Test
   public void testAliasHandling() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION, 2, 2);
 
-    CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION2, 2, 2);
 
     CloudSolrClient client = getRandomClient();
@@ -239,27 +228,25 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
   @Test
   public void testRouting() throws Exception {
-    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("routing_collection", 2, 2);
-
+    
     AbstractUpdateRequest request = new UpdateRequest()
         .add(id, "0", "a_t", "hello1")
         .add(id, "2", "a_t", "hello2")
         .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
-
+    
     // Test single threaded routed updates for UpdateRequest
     NamedList<Object> response = getRandomClient().request(request, "routing_collection");
     if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
       checkSingleServer(response);
     }
     CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
-    Map<String, LBHttpSolrClient.Req> routes = rr.getRoutes();
-    Iterator<Map.Entry<String, LBHttpSolrClient.Req>> it = routes.entrySet()
+    Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
+    Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
         .iterator();
     while (it.hasNext()) {
-      Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
+      Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
       String url = entry.getKey();
       UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
           .getRequest();
@@ -275,9 +262,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
         assertTrue(docList.getNumFound() == 1);
       }
     }
-
+    
     // Test the deleteById routing for UpdateRequest
-
+    
     final UpdateResponse uResponse = new UpdateRequest()
         .deleteById("0")
         .deleteById("2")
@@ -289,7 +276,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
     SolrDocumentList docs = qResponse.getResults();
     assertEquals(0, docs.getNumFound());
-
+    
     // Test Multi-Threaded routed updates for UpdateRequest
     try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
         (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
@@ -305,7 +292,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       it = routes.entrySet()
           .iterator();
       while (it.hasNext()) {
-        Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
+        Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
         String url = entry.getKey();
         UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
             .getRequest();
@@ -347,7 +334,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     }
 
     assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
-            + "; all=" + requestCountsMap.keySet(),
+        + "; all=" + requestCountsMap.keySet(),
         expectedBaseURLs.size() < requestCountsMap.size());
 
     // Calculate a number of shard keys that route to the same shard.
@@ -357,7 +344,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     } else {
       n = random().nextInt(9) + 2;
     }
-
+    
     List<String> sameShardRoutes = Lists.newArrayList();
     sameShardRoutes.add("0");
     for (int i = 1; i < n; i++) {
@@ -425,7 +412,6 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     // all its cores on the same node.
     // Hence the below configuration for our collection
     CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(liveNodes * liveNodes)
         .processAndWait(cluster.getSolrClient(), TIMEOUT);
     cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
@@ -445,7 +431,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
   private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
                                               boolean useShardsPreference,
                                               String collectionName)
-      throws Exception {
+      throws Exception
+  {
     SolrQuery qRequest = new SolrQuery("*:*");
 
     ModifiableSolrParams qParams = new ModifiableSolrParams();
@@ -464,18 +451,18 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
 
     Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
-    assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
 
     // Iterate over shards-info and check what cores responded
-    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
     @SuppressWarnings({"unchecked"})
     Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
     List<String> shardAddresses = new ArrayList<String>();
     while (itr.hasNext()) {
       Map.Entry<String, ?> e = itr.next();
-      assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
-      String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
-      assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
       shardAddresses.add(shardAddress);
     }
     if (log.isInfoEnabled()) {
@@ -484,14 +471,14 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
     // Make sure the distributed queries were directed to a single node only
     Set<Integer> ports = new HashSet<Integer>();
-    for (String shardAddr : shardAddresses) {
-      URL url = new URL(shardAddr);
+    for (String shardAddr: shardAddresses) {
+      URL url = new URL (shardAddr);
       ports.add(url.getPort());
     }
 
     // This assertion would hold true as long as every shard has a core on each node
-    assertTrue("Response was not received from shards on a single node",
-        shardAddresses.size() > 1 && ports.size() == 1);
+    assertTrue ("Response was not received from shards on a single node",
+        shardAddresses.size() > 1 && ports.size()==1);
   }
 
   /**
@@ -504,8 +491,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     int liveNodes = cluster.getJettySolrRunners().size();
 
     // For testing replica.type, we want to have all replica types available for the collection
-    CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes / 3, liveNodes / 3, liveNodes / 3)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
         .setMaxShardsPerNode(liveNodes)
         .processAndWait(cluster.getSolrClient(), TIMEOUT);
     cluster.waitForActiveCollection(collectionName, 1, liveNodes);
@@ -524,9 +510,10 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
   }
 
   private void queryReplicaType(CloudSolrClient cloudClient,
-                                Replica.Type typeToQuery,
-                                String collectionName)
-      throws Exception {
+                                          Replica.Type typeToQuery,
+                                          String collectionName)
+      throws Exception
+  {
     SolrQuery qRequest = new SolrQuery("*:*");
 
     ModifiableSolrParams qParams = new ModifiableSolrParams();
@@ -539,21 +526,21 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
 
     Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
-    assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
 
     // Iterate over shards-info and check what cores responded
-    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
     @SuppressWarnings({"unchecked"})
     Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
     List<String> shardAddresses = new ArrayList<String>();
     while (itr.hasNext()) {
       Map.Entry<String, ?> e = itr.next();
-      assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
-      String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
       if (shardAddress.endsWith("/")) {
         shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
       }
-      assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
       shardAddresses.add(shardAddress);
     }
     assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
@@ -570,7 +557,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       SolrServerException, IOException {
 
     NamedList<Object> resp;
-    try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/" + collectionName, 15000, 60000)) {
+    try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/"+ collectionName, 15000, 60000)) {
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set("qt", "/admin/mbeans");
       params.set("stats", "true");
@@ -587,12 +574,12 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       name = category + "." + (scope != null ? scope : key) + ".requests";
     }
     @SuppressWarnings({"unchecked"})
-    Map<String, Object> map = (Map<String, Object>) resp.findRecursive("solr-mbeans", category, key, "stats");
+    Map<String,Object> map = (Map<String,Object>)resp.findRecursive("solr-mbeans", category, key, "stats");
     if (map == null) {
       return null;
     }
     if (scope != null) { // admin handler uses a meter instead of counter here
-      return (Long) map.get(name + ".count");
+      return (Long)map.get(name + ".count");
     } else {
       return (Long) map.get(name);
     }
@@ -655,10 +642,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
 
       String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
-          .setPerReplicaState(USE_PER_REPLICA_STATE)
           .processAsync(client);
       String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
-          .setPerReplicaState(USE_PER_REPLICA_STATE)
           .processAsync(client);
 
       CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
@@ -724,7 +709,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
       QueryResponse rsp = solrClient.query(q);
       @SuppressWarnings({"rawtypes"})
-      Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size() - 1);
+      Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
       assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
       assertNotNull(m.get(COLLECTION));
     }
@@ -741,19 +726,19 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
     for (String s : liveNodes) {
       String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
-      if (!allNodesOfColl.contains(n)) {
+      if(!allNodesOfColl.contains(n)){
         theNode = n;
         break;
       }
     }
-    log.info("the node which does not serve this collection{} ", theNode);
+    log.info("the node which does not serve this collection{} ",theNode);
     assertNotNull(theNode);
 
 
     final String solrClientUrl = theNode + "/" + COLLECTION;
     try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
 
-      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1));
+      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
       try {
         QueryResponse rsp = solrClient.query(q);
         log.info("error was expected");
@@ -801,11 +786,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
   @Test
   public void testVersionsAreReturned() throws Exception {
-    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("versions_collection", 2, 2);
-
+    
     // assert that "adds" are returned
     UpdateRequest updateRequest = new UpdateRequest()
         .add("id", "1", "a_t", "hello1")
@@ -814,7 +797,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
     NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
     Object addsObject = response.get("adds");
-
+    
     assertNotNull("There must be a adds parameter", addsObject);
     assertTrue(addsObject instanceof NamedList<?>);
     NamedList<?> adds = (NamedList<?>) addsObject;
@@ -849,12 +832,10 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     NamedList deletes = (NamedList) deletesObject;
     assertEquals("There must be 1 version", 1, deletes.size());
   }
-
+  
   @Test
   public void testInitializationWithSolrUrls() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION, 2, 2);
     CloudSolrClient client = httpBasedCloudSolrClient;
     SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
@@ -877,13 +858,12 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
     final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
     final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
-
+    
     // start with exactly 1 shard/replica...
     assertEquals("Couldn't create collection", 0,
-        CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
-            .setPerReplicaState(USE_PER_REPLICA_STATE)
-            .setCreateNodeSet(old_leader_node.getNodeName())
-            .process(cluster.getSolrClient()).getStatus());
+                 CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
+                 .setCreateNodeSet(old_leader_node.getNodeName())
+                 .process(cluster.getSolrClient()).getStatus());
     cluster.waitForActiveCollection(COL, 1, 1);
 
     // determine the coreNodeName of only current replica
@@ -902,47 +882,47 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       // don't let collection cache entries get expired, even on a slow machine...
       stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
       stale_client.setDefaultCollection(COL);
-
+     
       // do a query to populate stale_client's cache...
       assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-
+      
       // add 1 replica on a diff node...
       assertEquals("Couldn't create collection", 0,
-          CollectionAdminRequest.addReplicaToShard(COL, "shard1")
-              .setNode(new_leader_node.getNodeName())
-              // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
-              .process(cluster.getSolrClient()).getStatus());
+                   CollectionAdminRequest.addReplicaToShard(COL, "shard1")
+                   .setNode(new_leader_node.getNodeName())
+                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
+                   .process(cluster.getSolrClient()).getStatus());
       AbstractDistribZkTestBase.waitForRecoveriesToFinish
-          (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
-
+        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
+      
       // ...and delete our original leader.
       assertEquals("Couldn't create collection", 0,
-          CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
-              // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
-              .process(cluster.getSolrClient()).getStatus());
+                   CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
+                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
+                   .process(cluster.getSolrClient()).getStatus());
       AbstractDistribZkTestBase.waitForRecoveriesToFinish
-          (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
+        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
 
       // stale_client's collection state cache should now only point at a leader that no longer exists.
-
+      
       // attempt a (direct) update that should succeed in spite of cached cluster state
       // pointing solely to a node that's no longer part of our collection...
       assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
       assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-
+      
     }
   }
-
+  
 
   private static void checkSingleServer(NamedList<Object> response) {
     final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
-    final Map<String, LBHttpSolrClient.Req> routes = rr.getRoutes();
-    final Iterator<Map.Entry<String, LBHttpSolrClient.Req>> it =
+    final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
+    final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
         routes.entrySet().iterator();
     while (it.hasNext()) {
-      Map.Entry<String, LBHttpSolrClient.Req> entry = it.next();
-      assertEquals("wrong number of servers: " + entry.getValue().getServers(),
-          1, entry.getValue().getServers().size());
+      Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
+        assertEquals("wrong number of servers: "+entry.getValue().getServers(),
+            1, entry.getValue().getServers().size());
     }
   }
 
@@ -962,11 +942,10 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     // Hence the below configuration for our collection
     int pullReplicas = Math.max(1, liveNodes - 2);
     CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
         .setMaxShardsPerNode(liveNodes)
         .processAndWait(cluster.getSolrClient(), TIMEOUT);
     cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
-
+    
     // Add some new documents
     new UpdateRequest()
         .add(id, "0", "a_t", "hello1")
@@ -996,7 +975,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
                                            String preferReplicaTypes,
                                            boolean preferLocalShards,
                                            String collectionName)
-      throws Exception {
+      throws Exception
+  {
     SolrQuery qRequest = new SolrQuery("*:*");
     ModifiableSolrParams qParams = new ModifiableSolrParams();
 
@@ -1017,7 +997,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
       rule.append(":local");
     }
-    qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
+    qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());  
     qParams.add(ShardParams.SHARDS_INFO, "true");
     qRequest.add(qParams);
 
@@ -1028,7 +1008,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
 
     Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
-    assertNotNull("Unable to obtain " + ShardParams.SHARDS_INFO, shardsInfo);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
 
     Map<String, String> replicaTypeMap = new HashMap<String, String>();
     DocCollection collection = getCollectionState(collectionName);
@@ -1045,15 +1025,15 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     }
 
     // Iterate over shards-info and check that replicas of correct type responded
-    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>) shardsInfo;
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
     @SuppressWarnings({"unchecked"})
     Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
     List<String> shardAddresses = new ArrayList<String>();
     while (itr.hasNext()) {
       Map.Entry<String, ?> e = itr.next();
-      assertTrue("Did not find map-type value in " + ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
-      String shardAddress = (String) ((Map) e.getValue()).get("shardAddress");
-      assertNotNull(ShardParams.SHARDS_INFO + " did not return 'shardAddress' parameter", shardAddress);
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
       assertTrue(replicaTypeMap.containsKey(shardAddress));
       assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
       shardAddresses.add(shardAddress);
@@ -1067,9 +1047,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
   @Test
   public void testPing() throws Exception {
     final String testCollection = "ping_test";
-    CollectionAdminRequest.createCollection(testCollection, "conf", 2, 1)
-        .setPerReplicaState(USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(testCollection, "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection(testCollection, 2, 2);
     final SolrClient clientUnderTest = getRandomClient();
 
@@ -1078,24 +1056,4 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     assertEquals("This should be OK", 0, response.getStatus());
   }
 
-  public void testPerReplicaStateCollection() throws Exception {
-    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
-        .process(cluster.getSolrClient());
-
-    final String testCollection = "perReplicaState_test";
-    int liveNodes = cluster.getJettySolrRunners().size();
-    CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
-        .setMaxShardsPerNode(liveNodes)
-        .setPerReplicaState(Boolean.TRUE)
-        .process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(testCollection, 2, 4);
-    final SolrClient clientUnderTest = getRandomClient();
-    final SolrPingResponse response = clientUnderTest.ping(testCollection);
-    assertEquals("This should be OK", 0, response.getStatus());
-    DocCollection c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
-    c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
-    PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
-    assertEquals(4, prs.states.size());
-  }
-
 }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
index 22d6a0c..e8aef51 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -126,8 +126,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
 
     for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
       CollectionAdminRequest.createCollection(collection, "_default", 2, 2)
-          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-          .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
+        .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
         .process(cluster.getSolrClient());
     }
     
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index 012b21a..397d655 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -74,9 +74,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     } else {
       collection = COLLECTIONORALIAS;
     }
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index b7eaa64..edef269 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -65,10 +65,7 @@ public class MathExpressionTest extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index 2fa0dd0..add4331 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -68,9 +68,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
     } else {
       collection = COLLECTIONORALIAS;
     }
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 4d77540..6c88ffe 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -96,8 +96,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collection, 2, 2);
     
@@ -2680,8 +2679,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("destinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -2775,8 +2773,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -2874,8 +2871,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelDaemonUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3049,8 +3045,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   public void testParallelTerminatingDaemonUpdateStream() throws Exception {
     Assume.assumeTrue(!useAlias);
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3236,8 +3231,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("destinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -3330,8 +3324,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -3429,8 +3422,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelDaemonCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3647,14 +3639,11 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   public void testClassifyStream() throws Exception {
     Assume.assumeTrue(!useAlias);
 
-    CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("modelCollection", 2, 2);
-    CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("uknownCollection", 2, 2);
-    CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).process(cluster.getSolrClient());
     cluster.waitForActiveCollection("checkpointCollection", 2, 2);
 
     UpdateRequest updateRequest = new UpdateRequest();
@@ -3878,14 +3867,11 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
   @Test
   public void testExecutorStream() throws Exception {
-    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("workQueue", 2, 2);
-    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("mainCorpus", 2, 2);
-    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("destination", 2, 2);
 
     UpdateRequest workRequest = new UpdateRequest();
@@ -3947,14 +3933,11 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
   @Test
   public void testParallelExecutorStream() throws Exception {
-    CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
 
-    CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
 
-    CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
 
     cluster.waitForActiveCollection("workQueue1", 2, 2);
     cluster.waitForActiveCollection("mainCorpus1", 2, 2);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
index 9563da7..48f13c2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
@@ -56,9 +56,7 @@ public class DirectJsonQueryRequestFacetingIntegrationTest extends SolrCloudTest
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
index 9d5c3e6..f4406c1 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
@@ -63,9 +63,7 @@ public class JsonQueryRequestFacetingIntegrationTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
index 977d405..1ccd581 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
@@ -47,9 +47,7 @@ public class JsonQueryRequestHeatmapFacetingTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
 
     indexSpatialData();
   }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
index 6d546a4..1ef806e 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
@@ -97,7 +97,6 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     assertFalse("CloudCollectionsListener not triggered after registration", newResults.get(2).contains("testcollection1"));
 
     CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
         (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -111,7 +110,6 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     client.getZkStateReader().removeCloudCollectionsListener(watcher1);
 
     CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection2", 4, 4);
 
@@ -138,12 +136,10 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     CloudSolrClient client = cluster.getSolrClient();
 
     CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection1", 4, 4);
     
     CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection2", 4, 4);
 
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index d78f0e8..ab3dc95 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -33,7 +33,6 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,8 +121,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     // note: one node in our cluster is unsed by collection
     CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
@@ -171,8 +169,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     final CountDownLatch latch = new CountDownLatch(1);
     client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -202,8 +199,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
@@ -221,7 +217,6 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   }
 
   @Test
-  @Ignore
   public void testCanWaitForNonexistantCollection() throws Exception {
 
     Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
@@ -252,8 +247,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -307,8 +301,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   @Test
   public void testDeletionsTriggerWatches() throws Exception {
     CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(cluster.getSolrClient());
+      .process(cluster.getSolrClient());
     
     Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                                               (l, c) -> c == null);
@@ -322,9 +315,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   public void testLiveNodeChangesTriggerWatches() throws Exception {
     final CloudSolrClient client = cluster.getSolrClient();
     
-    CollectionAdminRequest.createCollection("test_collection", "config", 1, 1)
-        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
-        .process(client);
+    CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
 
     Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                                               (l, c) -> (l.size() == 1 + CLUSTER_SIZE));
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
deleted file mode 100644
index b6ea6f7..0000000
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.common.cloud;
-
-
-import java.util.Set;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.cloud.Replica.State;
-import org.apache.zookeeper.CreateMode;
-import org.junit.After;
-import org.junit.Before;
-
-public class TestPerReplicaStates extends SolrCloudTestCase {
-  @Before
-  public void prepareCluster() throws Exception {
-    configureCluster(4)
-        .configure();
-  }
-
-  @After
-  public void tearDownCluster() throws Exception {
-    shutdownCluster();
-  }
-
-  public void testBasic() {
-    PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
-    assertEquals("R1:1:A", rs.asString);
-
-    rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
-    assertEquals("R1:1:D:L", rs.asString);
-    rs = PerReplicaStates.State.parse (rs.asString);
-    assertEquals(State.DOWN, rs.state);
-
-  }
-
-  public void testEntries() {
-    PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
-    assertEquals(2, entries.get("R1").version);
-    entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
-    assertEquals(2, entries.get("R1").version);
-    assertEquals(2, entries.get("R1").getDuplicates().size());
-    Set<String> modified = PerReplicaStates.findModifiedReplicas(entries,  new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
-    assertEquals(1, modified.size());
-    assertTrue(modified.contains("R3"));
-    modified = PerReplicaStates.findModifiedReplicas( entries,
-        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
-    assertEquals(2, modified.size());
-    assertTrue(modified.contains("R3"));
-    assertTrue(modified.contains("R4"));
-    modified = PerReplicaStates.findModifiedReplicas( entries,
-        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
-    assertEquals(3, modified.size());
-    assertTrue(modified.contains("R3"));
-    assertTrue(modified.contains("R4"));
-    assertTrue(modified.contains("R2"));
-
-
-  }
-
-  public void testReplicaStateOperations() throws Exception {
-    String root = "/testReplicaStateOperations";
-    cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
-
-    ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
-
-    for (String state : states) {
-      cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
-    }
-
-    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
-    PerReplicaStates rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    assertEquals(3, rs.states.size());
-    assertTrue(rs.cversion >= 5);
-
-    PerReplicaStatesOps ops = PerReplicaStatesOps.addReplica("R5",State.ACTIVE, false, rs);
-    assertEquals(1, ops.get().size());
-    assertEquals(PerReplicaStates.Operation.Type.ADD , ops.ops.get(0).typ );
-    ops.persist(root,cluster.getZkClient());
-    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    assertEquals(4, rs.states.size());
-    assertTrue(rs.cversion >= 6);
-    assertEquals(6,  cluster.getZkClient().getChildren(root, null,true).size());
-    ops =  PerReplicaStatesOps.flipState("R1", State.DOWN , rs);
-
-    assertEquals(4, ops.ops.size());
-    assertEquals(PerReplicaStates.Operation.Type.ADD,  ops.ops.get(0).typ);
-    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(1).typ);
-    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(2).typ);
-    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(3).typ);
-    ops.persist(root, cluster.getZkClient());
-    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    assertEquals(4, rs.states.size());
-    assertEquals(3, rs.states.get("R1").version);
-
-    ops =  PerReplicaStatesOps.deleteReplica("R5" , rs);
-    assertEquals(1, ops.ops.size());
-    ops.persist(root,cluster.getZkClient());
-
-    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    assertEquals(3, rs.states.size());
-
-    ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs);
-    assertEquals(2, ops.ops.size());
-    assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
-    assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
-    ops.persist(root,cluster.getZkClient());
-    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    ops =  PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs);
-    assertEquals(4, ops.ops.size());
-    ops.persist(root,cluster.getZkClient());
-    rs =PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
-    assertTrue(rs.get("R3").isLeader);
-  }
-
-}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 40b35f8..b646e2e 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -57,7 +57,6 @@ import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +81,6 @@ import org.slf4j.LoggerFactory;
 public class SolrCloudTestCase extends SolrTestCaseJ4 {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static Boolean USE_PER_REPLICA_STATE =  Boolean.FALSE;
 
   public static final int DEFAULT_TIMEOUT = 45; // this is an important timeout for test stability - can't be too short
 
@@ -95,14 +93,6 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
     }
   }
 
-  @BeforeClass
-  public static void b4Class() {
-    USE_PER_REPLICA_STATE = Boolean.parseBoolean(System.getProperty("use.perreplica", String.valueOf(random().nextBoolean())));
-    if(USE_PER_REPLICA_STATE) {
-      log.info("Using per-replica state");
-    }
-  }
-
   /**
    * Builder class for a MiniSolrCloudCluster
    */