You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/01/05 08:28:21 UTC

[lucene-solr] branch jira/solr-15052 updated: porting changes from jira/solr-15052-8x

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr-15052
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-15052 by this push:
     new 5c5cd01  porting changes from jira/solr-15052-8x
5c5cd01 is described below

commit 5c5cd0151dc82a287235e241ef87dc03a9178341
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Jan 5 19:27:54 2021 +1100

    porting changes from jira/solr-15052-8x
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/cloud/ShardLeaderElectionContextBase.java |  18 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  55 +-
 .../OverseerCollectionMessageHandler.java          |  26 +-
 .../solr/cloud/overseer/CollectionMutator.java     |  26 +-
 .../apache/solr/cloud/overseer/NodeMutator.java    |  11 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  19 +-
 .../apache/solr/cloud/overseer/SliceMutator.java   |  47 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  74 ++-
 .../apache/solr/cloud/overseer/ZkWriteCommand.java |  20 +
 .../solr/handler/admin/CollectionsHandler.java     |  10 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  25 +-
 .../org/apache/solr/cloud/ZkSolrClientTest.java    |  31 ++
 .../solr/handler/PingRequestHandlerTest.java       |   2 +
 .../org/apache/solr/handler/TestSQLHandler.java    |   4 +-
 .../solr/handler/TestStressThreadBackup.java       |  13 +-
 .../solr/handler/admin/HealthCheckHandlerTest.java |   1 +
 .../handler/admin/MetricsHistoryHandlerTest.java   |   3 +-
 .../solr/handler/component/SearchHandlerTest.java  |   4 +
 .../transform/TestSubQueryTransformerDistrib.java  |   2 +
 .../apache/solr/schema/TestManagedSchemaAPI.java   |   2 +
 .../search/facet/TestCloudJSONFacetJoinDomain.java |   1 +
 .../solr/search/facet/TestCloudJSONFacetSKG.java   |  12 +-
 .../search/facet/TestCloudJSONFacetSKGEquiv.java   |  14 +-
 .../search/join/CrossCollectionJoinQueryTest.java  |   2 +
 .../apache/solr/search/stats/TestDistribIDF.java   |   3 +-
 .../processor/TemplateUpdateProcessorTest.java     |   2 +-
 .../solr/util/tracing/TestDistributedTracing.java  |   1 +
 .../client/solrj/cloud/DistribStateManager.java    |   7 +
 .../client/solrj/impl/SolrClientCloudManager.java  |   5 +-
 .../client/solrj/impl/ZkDistribStateManager.java   |   6 +
 .../solrj/request/CollectionAdminRequest.java      |   8 +
 .../org/apache/solr/common/cloud/ClusterState.java |  71 +++
 .../apache/solr/common/cloud/DocCollection.java    |  82 ++-
 .../apache/solr/common/cloud/PerReplicaStates.java | 600 +++++++++++++++++++++
 .../java/org/apache/solr/common/cloud/Replica.java |  61 ++-
 .../java/org/apache/solr/common/cloud/Slice.java   |  26 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  12 +
 .../apache/solr/common/cloud/ZkStateReader.java    |  93 +++-
 .../IndexingNestedDocuments.java                   |   7 +-
 .../JsonRequestApiHeatmapFacetingTest.java         |   4 +-
 .../ref_guide_examples/JsonRequestApiTest.java     |   3 +-
 .../UsingSolrJRefGuideExamplesTest.java            |   1 +
 .../solrj/io/stream/CloudAuthStreamTest.java       |   8 +-
 .../client/solrj/io/stream/JDBCStreamTest.java     |   4 +-
 .../client/solrj/io/stream/MathExpressionTest.java |   5 +-
 .../solrj/io/stream/SelectWithEvaluatorsTest.java  |   4 +-
 .../solrj/io/stream/StreamDecoratorTest.java       |  51 +-
 ...ectJsonQueryRequestFacetingIntegrationTest.java |   8 +-
 .../JsonQueryRequestFacetingIntegrationTest.java   |   8 +-
 .../json/JsonQueryRequestHeatmapFacetingTest.java  |   4 +-
 .../cloud/TestCloudCollectionsListeners.java       |   4 +
 .../common/cloud/TestCollectionStateWatchers.java  |  22 +-
 .../solr/common/cloud/TestPerReplicaStates.java    | 137 +++++
 .../org/apache/solr/cloud/SolrCloudTestCase.java   |   1 +
 55 files changed, 1500 insertions(+), 172 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5426eb8..2ee1837 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -328,6 +328,8 @@ New Features
 
 * SOLR-14615: CPU Utilization Based Circuit Breaker (Atri Sharma)
 
+* SOLR-15052: Reducing overseer bottlenecks using per-replica states (noble, Ishan Chattopadhyaya)
+
 Improvements
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index b010de2..3c872fb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -19,12 +19,14 @@ package org.apache.solr.cloud;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -179,7 +181,14 @@ class ShardLeaderElectionContextBase extends ElectionContext {
           ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
       assert zkController != null;
       assert zkController.getOverseer() != null;
-      zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+      DocCollection coll = zkStateReader.getCollection(this.collection);
+      if (coll == null || coll.getStateFormat() < 2 || ZkController.sendToOverseer(coll, id)) {
+        zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+      } else {
+        PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+        PerReplicaStates.WriteOps writeOps = PerReplicaStates.WriteOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs);
+        PerReplicaStates.persist(writeOps, coll.getZNode(), zkStateReader.getZkClient());
+      }
     }
   }
 
@@ -187,9 +196,4 @@ class ShardLeaderElectionContextBase extends ElectionContext {
     return leaderElector;
   }
 
-  Integer getLeaderZkNodeParentVersion() {
-    synchronized (lock) {
-      return leaderZkNodeParentVersion;
-    }
-  }
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5df5cee..ab62384 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -64,31 +64,8 @@ import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ConnectionManager;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocCollectionWatcher;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.common.cloud.NodesSysPropsCacher;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.cloud.Replica.Type;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.UrlScheme;
-import org.apache.solr.common.cloud.ZkACLProvider;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkCredentialsProvider;
-import org.apache.solr.common.cloud.ZkMaintenanceUtils;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
@@ -1606,12 +1583,40 @@ public class ZkController implements Closeable {
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      overseerJobQueue.offer(Utils.toJSON(m));
+      DocCollection coll = zkStateReader.getCollection(collection);
+      if (forcePublish || sendToOverseer(coll, coreNodeName)) {
+        overseerJobQueue.offer(Utils.toJSON(m));
+      } else {
+        if (log.isDebugEnabled()) {
+          log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
+        }
+        PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+        PerReplicaStates.WriteOps ops = PerReplicaStates.WriteOps.flipState(coreNodeName, state, perReplicaStates);
+        PerReplicaStates.persist(ops, coll.getZNode(), zkClient);
+      }
     } finally {
       MDCLoggingContext.clear();
     }
   }
 
+  /**
+   * Whether a message needs to be sent to overseer or not
+   */
+  static boolean sendToOverseer(DocCollection coll, String replicaName) {
+    if (coll == null) return true;
+    if (!coll.isPerReplicaState()) return true;
+    Replica r = coll.getReplica(replicaName);
+    if (r == null) return true;
+    Slice shard = coll.getSlice(r.shard);
+    if (shard == null) return true;//very unlikely
+    if (shard.getState() == Slice.State.RECOVERY) return true;
+    if (shard.getParent() != null) return true;
+    for (Slice slice : coll.getSlices()) {
+      if (Objects.equals(shard.getName(), slice.getParent())) return true;
+    }
+    return false;
+  }
+
   public ZkShardTerms getShardTerms(String collection, String shardId) {
     return getCollectionTerms(collection).getShard(shardId);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 4150842..f601c24 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -36,10 +36,10 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
 import org.apache.solr.client.solrj.cloud.BadVersionException;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -62,7 +62,6 @@ import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.UrlScheme;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -75,6 +74,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.TimeSource;
@@ -84,7 +84,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.util.RTimer;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.CreateMode;
@@ -92,17 +91,19 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -142,7 +143,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       ZkStateReader.REPLICATION_FACTOR, "1",
       ZkStateReader.NRT_REPLICAS, "1",
       ZkStateReader.TLOG_REPLICAS, "0",
-      ZkStateReader.PULL_REPLICAS, "0"));
+      ZkStateReader.PULL_REPLICAS, "0",
+      WITH_COLLECTION, null,
+      COLOCATED_WITH, null));
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String FAILURE_FIELD = "failure";
@@ -305,7 +308,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
       throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
-        CORE_NODE_NAME_PROP, NODE_NAME_PROP, REJOIN_AT_HEAD_PROP);
+        CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
@@ -315,9 +318,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
     params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
     params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
-    params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+    params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
 
-    String baseUrl = UrlScheme.INSTANCE.getBaseUrlForNodeName(message.getStr(NODE_NAME_PROP));
+    String baseUrl = message.getStr(BASE_URL_PROP);
     ShardRequest sreq = new ShardRequest();
     sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
     // yes, they must use same admin handler path everywhere...
@@ -427,9 +430,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
         ZkStateReader.CORE_NAME_PROP, core,
-        ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
+        ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
         ZkStateReader.COLLECTION_PROP, collectionName,
-        ZkStateReader.CORE_NODE_NAME_PROP, replicaName);
+        ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
+        ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
     overseer.offerStateUpdate(Utils.toJSON(m));
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index b64ca49..c94b445 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -28,8 +28,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
@@ -46,10 +48,12 @@ public class CollectionMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
+  protected final SolrZkClient zkClient;
 
   public CollectionMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
+    this.zkClient = SliceMutator.getZkClient(cloudManager);
   }
 
   public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) {
@@ -107,7 +111,21 @@ public class CollectionMutator {
     DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP));
     Map<String, Object> m = coll.shallowCopy();
     boolean hasAnyOps = false;
+    PerReplicaStates.WriteOps replicaOps = null;
     for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+      if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
+        String val = message.getStr(DocCollection.PER_REPLICA_STATE);
+        if (val == null) continue;
+        boolean enable = Boolean.parseBoolean(val);
+        if (enable == coll.isPerReplicaState()) {
+          //already enabled
+          log.error("trying to set perReplicaState to {} from {}", val, coll.isPerReplicaState());
+          continue;
+        }
+        replicaOps = PerReplicaStates.WriteOps.modifyCollection(coll, enable, PerReplicaStates.fetch(coll.getZNode(), zkClient, null));
+      }
+
+
       if (message.containsKey(prop)) {
         hasAnyOps = true;
         if (message.get(prop) == null)  {
@@ -136,8 +154,12 @@ public class CollectionMutator {
       return ZkStateWriter.NO_OP;
     }
 
-    return new ZkWriteCommand(coll.getName(),
-        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion()));
+    DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion());
+    if (replicaOps == null){
+      return new ZkWriteCommand(coll.getName(), collection);
+    } else {
+      return new ZkWriteCommand(coll.getName(), collection, replicaOps, true);
+    }
   }
 
   public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 3f1971e..23d51c4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -45,6 +46,8 @@ public class NodeMutator {
 
     Map<String, DocCollection> collections = clusterState.getCollectionsMap();
     for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+       List<String> downedReplicas = new ArrayList<>();
+
       String collection = entry.getKey();
       DocCollection docCollection = entry.getValue();
 
@@ -68,6 +71,7 @@ public class NodeMutator {
                 Replica.State.DOWN, replica.type, props);
             newReplicas.put(replica.getName(), newReplica);
             needToUpdateCollection = true;
+            downedReplicas.add(replica.getName());
           }
         }
 
@@ -76,7 +80,12 @@ public class NodeMutator {
       }
 
       if (needToUpdateCollection) {
-        zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+        if (docCollection.isPerReplicaState()) {
+          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
+              PerReplicaStates.WriteOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+        } else {
+          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+        }
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f849143..1ce84ca 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -39,8 +39,10 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 public class ReplicaMutator {
@@ -57,10 +60,12 @@ public class ReplicaMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
+  protected SolrZkClient zkClient;
 
   public ReplicaMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
+    this.zkClient = getZkClient(cloudManager);
   }
 
   protected Replica setProperty(Replica replica, String key, String value) {
@@ -260,6 +265,7 @@ public class ReplicaMutator {
       log.info("Failed to update state because the replica does not exist, {}", message);
       return ZkStateWriter.NO_OP;
     }
+    boolean persistCollectionState = collection != null && collection.isPerReplicaState();
 
     if (coreNodeName == null) {
       coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
@@ -271,6 +277,7 @@ public class ReplicaMutator {
           log.info("Failed to update state because the replica does not exist, {}", message);
           return ZkStateWriter.NO_OP;
         }
+        persistCollectionState = true;
         // if coreNodeName is null, auto assign one
         coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
       }
@@ -285,6 +292,7 @@ public class ReplicaMutator {
       if (sliceName != null) {
         log.debug("shard={} is already registered", sliceName);
       }
+      persistCollectionState = true;
     }
     if (sliceName == null) {
       //request new shardId
@@ -295,13 +303,15 @@ public class ReplicaMutator {
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
+      persistCollectionState = true;
     }
 
     Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
+    Replica oldReplica = null;
     if (slice != null) {
-      Replica oldReplica = slice.getReplica(coreNodeName);
+      oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -366,7 +376,12 @@ public class ReplicaMutator {
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
     log.debug("Collection is now: {}", newCollection);
-    return new ZkWriteCommand(collectionName, newCollection);
+    if (collection != null && collection.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+      return new ZkWriteCommand(collectionName, newCollection, PerReplicaStates.WriteOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+    } else{
+      return new ZkWriteCommand(collectionName, newCollection);
+    }
   }
 
   private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 40ab1a3..28139ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -25,14 +25,17 @@ import java.util.Set;
 import com.google.common.collect.ImmutableSet;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,10 +54,21 @@ public class SliceMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
+  protected final SolrZkClient zkClient;
 
   public SliceMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
+    this.zkClient = getZkClient(cloudManager);
+  }
+
+  static SolrZkClient getZkClient(SolrCloudManager cloudManager) {
+    if (cloudManager instanceof SolrClientCloudManager) {
+      SolrClientCloudManager manager = (SolrClientCloudManager) cloudManager;
+      return manager.getZkClient();
+    } else {
+      return null;
+    }
   }
 
   public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -80,7 +94,15 @@ public class SliceMutator {
             ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
             ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
             ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
-    return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+
+    if (collection.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
+          PerReplicaStates.WriteOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+    } else {
+      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+    }
+
   }
 
   public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -106,7 +128,12 @@ public class SliceMutator {
       newSlices.put(slice.getName(), slice);
     }
 
-    return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+
+    if (coll.isPerReplicaState()) {
+      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices), PerReplicaStates.WriteOps.deleteReplica(cnn, coll.getPerReplicaStates()) , true);
+    } else {
+      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+    }
   }
 
   public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
@@ -124,6 +151,7 @@ public class SliceMutator {
     Slice slice = slices.get(sliceName);
 
     Replica oldLeader = slice.getLeader();
+    Replica newLeader = null;
     final Map<String, Replica> newReplicas = new LinkedHashMap<>();
     for (Replica replica : slice.getReplicas()) {
       // TODO: this should only be calculated once and cached somewhere?
@@ -132,7 +160,7 @@ public class SliceMutator {
       if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
         replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
       } else if (coreURL.equals(leaderUrl)) {
-        replica = new ReplicaMutator(cloudManager).setLeader(replica);
+        newLeader = replica = new ReplicaMutator(cloudManager).setLeader(replica);
       }
 
       newReplicas.put(replica.getName(), replica);
@@ -141,8 +169,17 @@ public class SliceMutator {
     Map<String, Object> newSliceProps = slice.shallowCopy();
     newSliceProps.put(Slice.REPLICAS, newReplicas);
     slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
-    return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
-  }
+    if (coll.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
+          PerReplicaStates.WriteOps.flipLeader(
+              slice.getReplicaNames(),
+              newLeader == null ? null : newLeader.getName(),
+              prs), false);
+    } else {
+      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+    }
+  }    
 
   public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
     String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 155fbc2..a3bd236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -27,6 +27,7 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -63,7 +64,7 @@ public class ZkStateWriter {
   protected final ZkStateReader reader;
   protected final Stats stats;
 
-  protected Map<String, DocCollection> updates = new HashMap<>();
+  protected Map<String, ZkWriteCommand> updates = new HashMap<>();
   private int numUpdates = 0;
   protected ClusterState clusterState = null;
   protected long lastUpdatedTime = 0;
@@ -111,15 +112,47 @@ public class ZkStateWriter {
     if (cmds.isEmpty()) return prevState;
     if (isNoOps(cmds)) return prevState;
 
+    boolean forceFlush = false;
+    if (cmds.size() == 1) {
+      //most messages result in only one command. let's deal with it right away
+      ZkWriteCommand cmd = cmds.get(0);
+      if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+        //we do not wish to batch any updates for collections with per-replica state because
+        // these changes go to individual ZK nodes and there is zero advantage to batching
+        //now check if there are any updates for the same collection already present
+        if (updates.containsKey(cmd.name)) {
+          //this should not happen
+          // but let's get those updates out anyway
+          writeUpdate(updates.remove(cmd.name));
+        }
+        //now let's write the current message
+        try {
+          return writeUpdate(cmd);
+        } finally {
+          if (callback !=null) callback.onWrite();
+        }
+      }
+    } else {
+      //there are more than one commands created as a result of this message
+      for (ZkWriteCommand cmd : cmds) {
+        if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+          // we don't try to optimize for this case. let's flush out all after this
+          forceFlush = true;
+          break;
+        }
+      }
+    }
+
+
     for (ZkWriteCommand cmd : cmds) {
       if (cmd == NO_OP) continue;
       prevState = prevState.copyWith(cmd.name, cmd.collection);
-      updates.put(cmd.name, cmd.collection);
+      updates.put(cmd.name, cmd);
       numUpdates++;
     }
     clusterState = prevState;
 
-    if (maybeFlushAfter()) {
+    if (forceFlush || maybeFlushAfter()) {
       ClusterState state = writePendingUpdates();
       if (callback != null) {
         callback.onWrite();
@@ -149,7 +182,15 @@ public class ZkStateWriter {
   public boolean hasPendingUpdates() {
     return numUpdates != 0;
   }
+  public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
+    Map<String, ZkWriteCommand> commands = new HashMap<>();
+    commands.put(command.name, command);
+    return writePendingUpdates(commands);
+  }
+  public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+    return writePendingUpdates(updates);
 
+  }
   /**
    * Writes all pending updates to ZooKeeper and returns the modified cluster state
    *
@@ -158,20 +199,31 @@ public class ZkStateWriter {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if (!hasPendingUpdates()) return clusterState;
+    if ((updates == this.updates)
+        && !hasPendingUpdates()) {
+      return clusterState;
+    }
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          DocCollection c = entry.getValue();
+          ZkWriteCommand cmd = entry.getValue();
+          DocCollection c = cmd.collection;
+
+          if (cmd.ops != null && cmd.ops.isPreOp()) {
+            PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
 
+            clusterState = clusterState.copyWith(name,
+                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+          }
+          if (!cmd.persistCollState) continue;
           if (c == null) {
             // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
             log.debug("going to delete state.json {}", path);
@@ -192,6 +244,14 @@ public class ZkStateWriter {
               clusterState = clusterState.copyWith(name, newCollection);
             }
           }
+          if (cmd.ops != null && !cmd.ops.isPreOp()) {
+            PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+            DocCollection currentCollState = clusterState.getCollection(cmd.name);
+            if ( currentCollState != null) {
+              clusterState = clusterState.copyWith(name,
+                  currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+            }
+          }
         }
 
         updates.clear();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index d464863..2f71674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,16 +17,34 @@
 package org.apache.solr.cloud.overseer;
 
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 
 public class ZkWriteCommand {
+
   public final String name;
   public final DocCollection collection;
+
   public final boolean noop;
+  // persist the collection state. If this is false, it means the collection state is not modified
+  public final boolean persistCollState;
+  public final PerReplicaStates.WriteOps ops;
 
+  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStates.WriteOps replicaOps, boolean persistCollState) {
+    boolean isPerReplicaState = collection.isPerReplicaState();
+    this.name = name;
+    this.collection = collection;
+    this.noop = false;
+    this.ops = isPerReplicaState ? replicaOps : null;
+    this.persistCollState = isPerReplicaState ? persistCollState : true;
+  }
   public ZkWriteCommand(String name, DocCollection collection) {
     this.name = name;
     this.collection = collection;
     this.noop = false;
+    persistCollState = true;
+    this.ops = collection != null && collection.isPerReplicaState() ?
+        PerReplicaStates.WriteOps.touchChildren():
+        null;
   }
 
   /**
@@ -36,6 +54,8 @@ public class ZkWriteCommand {
     this.noop = true;
     this.name = null;
     this.collection = null;
+    this.ops = null;
+    persistCollState = true;
   }
 
   public static ZkWriteCommand noop() {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 399b87b..b2e99e6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -118,6 +118,7 @@ import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHan
 import static org.apache.solr.cloud.api.collections.RoutedAlias.CREATE_COLLECTION_PREFIX;
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
+import static org.apache.solr.common.cloud.DocCollection.PER_REPLICA_STATE;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
@@ -229,8 +230,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
       }
       CollectionOperation operation = CollectionOperation.get(action);
-      if (log.isDebugEnabled()) {
-        log.debug("Invoked Collection Action :{} with params {} and sendToOCPQueue={}"
+      if (log.isInfoEnabled()) {
+        log.info("Invoked Collection Action :{} with params {} and sendToOCPQueue={}"
             , action.toLower(), req.getParamString(), operation.sendToOCPQueue);
       }
       MDCLoggingContext.setCollection(req.getParams().get(COLLECTION));
@@ -462,6 +463,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           TLOG_REPLICAS,
           NRT_REPLICAS,
           WAIT_FOR_FINAL_STATE,
+          PER_REPLICA_STATE,
           ALIAS);
 
       if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
@@ -1321,7 +1323,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         //TODO only increase terms of replicas less out-of-sync
         liveReplicas.stream()
             .filter(rep -> zkShardTerms.registered(rep.getName()))
-            // TODO should this all be done at once instead of increasing each replica individually?
             .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
       }
 
@@ -1404,6 +1405,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
               }
               if (!n.contains(replica.getNodeName())
                   || !state.equals(Replica.State.ACTIVE.toString())) {
+                if (log.isDebugEnabled()) {
+                  log.debug("inactive replica {} , state {}", replica.getName(), replica.getReplicaState());
+                }
                 replicaNotAliveCnt++;
                 return false;
               }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index c4fc4e4..e3a8916 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.solr.cloud;
 
-import static java.util.Arrays.asList;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_DEF;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.NUM_SHARDS_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
-import static org.apache.solr.common.params.CollectionAdminParams.DEFAULTS;
-
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
@@ -77,6 +70,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Arrays.asList;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_DEF;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.NUM_SHARDS_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.DEFAULTS;
+
 @LuceneTestCase.Slow
 public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -104,6 +104,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateWithDefaultConfigSet() throws Exception {
     String collectionName = "solrj_default_configset";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, 2, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -236,6 +237,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
       assertEquals("2", String.valueOf(clusterProperty));
       CollectionAdminResponse response = CollectionAdminRequest
           .createCollection(COLL_NAME, "conf", null, null, null, null)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(cluster.getSolrClient());
       assertEquals(0, response.getStatus());
       assertTrue(response.isSuccess());
@@ -406,7 +408,9 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateAndDeleteAlias() throws IOException, SolrServerException {
 
     final String collection = "aliasedCollection";
-    CollectionAdminRequest.createCollection(collection, "conf", 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     CollectionAdminResponse response
         = CollectionAdminRequest.createAlias("solrj_alias", collection).process(cluster.getSolrClient());
@@ -421,6 +425,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
     final String collectionName = "solrj_test_splitshard";
     CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     cluster.waitForActiveCollection(collectionName, 2, 2);
@@ -475,6 +480,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     cluster.getJettySolrRunners().forEach(j -> j.getCoreContainer().getAllowPaths().add(tmpDir));
 
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty(CoreAdminParams.DATA_DIR, dataDir.toString())
         .withProperty(CoreAdminParams.ULOG_DIR, ulogDir.toString())
         .process(cluster.getSolrClient());
@@ -501,6 +507,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
     final String collectionName = "solrj_replicatests";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 1, 2);
@@ -569,6 +576,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     final String propName = "testProperty";
 
     CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collectionName, 2, 4);
@@ -600,6 +608,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testColStatus() throws Exception {
     final String collectionName = "collectionStatusTest";
     CollectionAdminRequest.createCollection(collectionName, "conf2", 2, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     cluster.waitForActiveCollection(collectionName, 2, 4);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
index e9afc3b..3f86835 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
@@ -18,7 +18,9 @@ package org.apache.solr.cloud;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -29,12 +31,16 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.solr.cloud.SolrCloudTestCase.configureCluster;
+
 public class ZkSolrClientTest extends SolrTestCaseJ4 {
 
   @BeforeClass
@@ -376,6 +382,31 @@ public class ZkSolrClientTest extends SolrTestCaseJ4 {
     }
   }
 
+  public void testZkBehavior() throws Exception {
+    MiniSolrCloudCluster cluster =
+        configureCluster(4)
+            .withJettyConfig(jetty -> jetty.enableV2(true))
+            .configure();
+    try {
+      SolrZkClient zkClient = cluster.getZkClient();
+      zkClient.create("/test-node", null, CreateMode.PERSISTENT, true);
+
+      Stat stat = zkClient.exists("/test-node", null, true);
+      int cversion = stat.getCversion();
+      List<Op> ops = Arrays.asList(
+          Op.create("/test-node/abc", null, zkClient.getZkACLProvider().getACLsToAdd("/test-node/abc"), CreateMode.PERSISTENT),
+          Op.delete("/test-node/abc", -1));
+      zkClient.multi(ops, true);
+      stat = zkClient.exists("/test-node", null, true);
+      assertTrue(stat.getCversion() >= cversion + 2);
+    } finally {
+      cluster.shutdown();
+    }
+
+  }
+
+
+
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
diff --git a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
index dd8aead..9d196cf 100644
--- a/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/PingRequestHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.SolrPing;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
@@ -188,6 +189,7 @@ public class PingRequestHandlerTest extends SolrTestCaseJ4 {
       String configName = "solrCloudCollectionConfig";
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1").resolve("conf"), configName);
       CollectionAdminRequest.createCollection(collectionName, configName, NUM_SHARDS, REPLICATION_FACTOR)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       // Send distributed and non-distributed ping query
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 96555ca..5bc68532 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -63,7 +63,9 @@ public class TestSQLHandler extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection(collection, 2, 2);
     if (useAlias) {
       CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
index 3622948..d2e5cad 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
@@ -24,9 +24,9 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -35,15 +35,14 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
-import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.lucene.util.TestUtil;
-
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -51,9 +50,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.util.TimeOut;
 import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -61,7 +59,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Nightly
+//@Nightly
 @SuppressCodecs({"SimpleText"})
 @LogLevel("org.apache.solr.handler.SnapShooter=DEBUG;org.apache.solr.core.IndexDeletionPolicyWrapper=DEBUG")
 public class TestStressThreadBackup extends SolrCloudTestCase {
@@ -96,7 +94,8 @@ public class TestStressThreadBackup extends SolrCloudTestCase {
         .configure();
 
     assertEquals(0, (CollectionAdminRequest.createCollection(DEFAULT_TEST_COLLECTION_NAME, "conf1", 1, 1)
-                     .process(cluster.getSolrClient()).getStatus()));
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient()).getStatus()));
     adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
     initCoreNameAndSolrCoreClient();
   }
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
index f6252db..5aa17f3 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/HealthCheckHandlerTest.java
@@ -80,6 +80,7 @@ public class HealthCheckHandlerTest extends SolrCloudTestCase {
     try (HttpSolrClient httpSolrClient = getHttpSolrClient(cluster.getJettySolrRunner(0).getBaseUrl().toString())) {
       CollectionAdminResponse collectionAdminResponse = CollectionAdminRequest.createCollection("test", "_default", 1, 1)
           .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(httpSolrClient);
       assertEquals(0, collectionAdminResponse.getStatus());
       SolrResponse response = req.process(httpSolrClient);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
index ea536be..835c8b1 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
@@ -78,7 +78,8 @@ public class MetricsHistoryHandlerTest extends SolrCloudTestCase {
 
     // create .system collection
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL,
-        "conf", 1, 1);
+        "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
     create.process(solrClient);
     CloudUtil.waitForState(cloudManager, "failed to create " + CollectionAdminParams.SYSTEM_COLL,
         CollectionAdminParams.SYSTEM_COLL, CloudUtil.clusterShape(1, 1));
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index f0b2973..aa17b55 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -140,6 +141,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
     
       QueryRequest req = new QueryRequest();
@@ -182,6 +184,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       ModifiableSolrParams params = new ModifiableSolrParams();
@@ -229,6 +232,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
 
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 1)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
           .process(miniCluster.getSolrClient());
 
       ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
index 0f9221c..524dff9 100644
--- a/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
+++ b/solr/core/src/test/org/apache/solr/response/transform/TestSubQueryTransformerDistrib.java
@@ -78,10 +78,12 @@ public class TestSubQueryTransformerDistrib extends SolrCloudTestCase {
     int replicas = 2 ;
     CollectionAdminRequest.createCollection(people, configName, shards, replicas)
         .withProperty("config", "solrconfig-doctransformers.xml")
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty("schema", "schema-docValuesJoin.xml")
         .process(cluster.getSolrClient());
 
     CollectionAdminRequest.createCollection(depts, configName, shards, replicas)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .withProperty("config", "solrconfig-doctransformers.xml")
         .withProperty("schema", 
               differentUniqueId ? "schema-minimal-with-another-uniqkey.xml":
diff --git a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
index a2375ba..f022ac3 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestManagedSchemaAPI.java
@@ -51,6 +51,8 @@ public class TestManagedSchemaAPI extends SolrCloudTestCase {
   public void test() throws Exception {
     String collection = "testschemaapi";
     CollectionAdminRequest.createCollection(collection, "conf1", 1, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+
         .process(cluster.getSolrClient());
     testModifyField(collection);
     testReloadAndAddSimple(collection);
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
index a88ed95..c1dc549 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetJoinDomain.java
@@ -109,6 +109,7 @@ public class TestCloudJSONFacetJoinDomain extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
index 0992af8..52fb73e 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKG.java
@@ -45,18 +45,17 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
-import static org.apache.solr.search.facet.RelatednessAgg.computeRelatedness;
-import static org.apache.solr.search.facet.RelatednessAgg.roundTo5Digits;
-
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.noggit.JSONUtil;
 import org.noggit.JSONWriter;
 import org.noggit.JSONWriter.Writable;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.search.facet.RelatednessAgg.computeRelatedness;
+import static org.apache.solr.search.facet.RelatednessAgg.roundTo5Digits;
+
 /** 
  * <p>
  * A randomized test of nested facets using the <code>relatedness()</code> function, that asserts the 
@@ -139,6 +138,7 @@ public class TestCloudJSONFacetSKG extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
index eb68662..5c3e832 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestCloudJSONFacetSKGEquiv.java
@@ -20,8 +20,8 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
@@ -47,18 +47,17 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
-import static org.apache.solr.search.facet.FacetField.FacetMethod;
-import static org.apache.solr.search.facet.SlotAcc.SweepingCountSlotAcc.SWEEP_COLLECTION_DEBUG_KEY;
-
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.noggit.JSONUtil;
 import org.noggit.JSONWriter;
 import org.noggit.JSONWriter.Writable;
-  
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.search.facet.FacetField.FacetMethod;
+import static org.apache.solr.search.facet.SlotAcc.SweepingCountSlotAcc.SWEEP_COLLECTION_DEBUG_KEY;
+
 /** 
  * <p>
  * A randomized test of nested facets using the <code>relatedness()</code> function, that asserts the 
@@ -133,6 +132,7 @@ public class TestCloudJSONFacetSKGEquiv extends SolrCloudTestCase {
     collectionProperties.put("config", "solrconfig-tlog.xml");
     collectionProperties.put("schema", "schema_latest.xml");
     CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
index ebdb960..a42ebaa 100644
--- a/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
+++ b/solr/core/src/test/org/apache/solr/search/join/CrossCollectionJoinQueryTest.java
@@ -54,9 +54,11 @@ public class CrossCollectionJoinQueryTest extends SolrCloudTestCase {
 
 
     CollectionAdminRequest.createCollection("products", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
     CollectionAdminRequest.createCollection("parts", "ccjoin", NUM_SHARDS, NUM_REPLICAS)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
 
   }
diff --git a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
index 4fc1f3a..cf0ac7e 100644
--- a/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
+++ b/solr/core/src/test/org/apache/solr/search/stats/TestDistribIDF.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
@@ -200,7 +201,7 @@ public class TestDistribIDF extends SolrTestCaseJ4 {
       response = create.process(solrCluster.getSolrClient());
       solrCluster.waitForActiveCollection(name, 3, 3);
     } else {
-      CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1);
+      CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(name,config,2,1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE);
       response = create.process(solrCluster.getSolrClient());
       solrCluster.waitForActiveCollection(name, 2, 2);
     }
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
index 6eb122a..c8ae8b2 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/TemplateUpdateProcessorTest.java
@@ -88,7 +88,7 @@ public class TemplateUpdateProcessorTest extends SolrCloudTestCase {
     params.add("commit", "true");
     UpdateRequest add = new UpdateRequest().add(solrDoc);
     add.setParams(params);
-    NamedList<Object> result = cluster.getSolrClient().request(CollectionAdminRequest.createCollection("c", "conf1", 1, 1));
+    NamedList<Object> result = cluster.getSolrClient().request(CollectionAdminRequest.createCollection("c", "conf1", 1, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE));
     Utils.toJSONString(result.asMap(4));
     AbstractFullDistribZkTestBase.waitForCollection(cluster.getSolrClient().getZkStateReader(), "c",1);
     cluster.getSolrClient().request(add, "c");
diff --git a/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java b/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
index 23f75ec..fc5dcb2 100644
--- a/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
+++ b/solr/core/src/test/org/apache/solr/util/tracing/TestDistributedTracing.java
@@ -56,6 +56,7 @@ public class TestDistributedTracing extends SolrCloudTestCase {
     waitForSampleRateUpdated(1.0);
     CollectionAdminRequest
         .createCollection(COLLECTION, "config", 2, 2)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection(COLLECTION, 2, 4);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
index cac6cd2..feae27f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -100,6 +101,12 @@ public interface DistribStateManager extends SolrCloseable {
     return tree;
   }
 
+  default PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
+    throw new UnsupportedOperationException("Not implemented");
+
+
+  }
+
   /**
    * Remove data recursively.
    * @param root root path
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 5ad7ff4..e1a9fd8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -37,8 +37,8 @@ import org.apache.http.util.EntityUtils;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
 import org.apache.solr.client.solrj.cloud.NodeStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -187,6 +187,9 @@ public class SolrClientCloudManager implements SolrCloudManager {
       return EMPTY;
     }
   }
+  public SolrZkClient getZkClient() {
+    return zkClient;
+  }
 
   @Override
   public DistributedQueueFactory getDistributedQueueFactory() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
index d2c3f40..fe4b961 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
@@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.VersionedData;
 import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -181,4 +182,9 @@ public class ZkDistribStateManager implements DistribStateManager {
   public SolrZkClient getZkClient() {
     return zkClient;
   }
+
+  @Override
+  public PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
+    return PerReplicaStates.fetch(path, zkClient, null);
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 44454e0..12bfc4b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -54,6 +54,7 @@ import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 
+import static org.apache.solr.common.cloud.DocCollection.PER_REPLICA_STATE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.READ_ONLY;
@@ -80,6 +81,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   public static final java.util.List<String> MODIFIABLE_COLLECTION_PROPERTIES = Arrays.asList(
       REPLICATION_FACTOR,
       COLL_CONF,
+      PER_REPLICA_STATE,
       READ_ONLY);
 
   protected final CollectionAction action;
@@ -433,6 +435,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Integer nrtReplicas;
     protected Integer pullReplicas;
     protected Integer tlogReplicas;
+    protected Boolean perReplicaState;
 
     protected Properties properties;
     protected String alias;
@@ -473,6 +476,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setReplicationFactor(Integer repl) { this.nrtReplicas = repl; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
+    public Create setPerReplicaState(Boolean b) {this.perReplicaState = b; return this; }
 
     public Create setAlias(String alias) {
       this.alias = alias;
@@ -489,6 +493,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Integer getNumNrtReplicas() { return nrtReplicas; }
     public Integer getNumTlogReplicas() {return tlogReplicas;}
     public Integer getNumPullReplicas() {return pullReplicas;}
+    public Boolean getPerReplicaState() {return perReplicaState;}
 
     /**
      * Provide the name of the shards to be created, separated by commas
@@ -559,6 +564,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (tlogReplicas != null) {
         params.set(ZkStateReader.TLOG_REPLICAS, tlogReplicas);
       }
+      if (Boolean.TRUE.equals(perReplicaState)) {
+        params.set(PER_REPLICA_STATE, perReplicaState);
+      }
       params.setNonNull(ALIAS, alias);
       return params;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index ebed0ff..5356a9f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -23,15 +24,19 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Immutable state of the cloud. Normally you can get the state by using
@@ -39,6 +44,8 @@ import org.noggit.JSONWriter;
  * @lucene.experimental
  */
 public class ClusterState implements JSONWriter.Writable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
 
   private final Map<String, CollectionRef> collectionStates, immutableCollectionStates;
   private Set<String> liveNodes;
@@ -241,6 +248,12 @@ public class ClusterState implements JSONWriter.Writable {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
+    if ("true".equals(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
+      log.info("a collection {} has per-replica state", name); // nocommit should be a debug
+      //this collection has replica states stored outside
+      ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
+      if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
+    }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(DocCollection.SHARDS);
     if (sliceObjs == null) {
@@ -383,5 +396,63 @@ public class ClusterState implements JSONWriter.Writable {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+  }
+  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+    REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
+  }
+
+
+  public static void clearReplicaStateProvider(){
+    REPLICASTATES_PROVIDER.remove();
+  }
+
+  private static class StatesProvider implements ReplicaStatesProvider {
+    private final Supplier<PerReplicaStates> replicaStatesSupplier;
+    private PerReplicaStates perReplicaStates;
+    private boolean isPerReplicaState = false;
+
+    public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+      this.replicaStatesSupplier = replicaStatesSupplier;
+    }
+
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return isPerReplicaState ? Optional.of(this) : Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      if (perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
+      return perReplicaStates;
+    }
+
+  }
+
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index c35ee8a..c35c750 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -30,6 +31,8 @@ import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
@@ -42,9 +45,12 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
  */
 public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
 
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
+  public static final String PER_REPLICA_STATE = "perReplicaState";
 
   private final int znodeVersion;
 
@@ -55,12 +61,17 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Map<String, List<Replica>> nodeNameReplicas;
   private final Map<String, List<Replica>> nodeNameLeaderReplicas;
   private final DocRouter router;
+  private final String znode;
 
   private final Integer replicationFactor;
   private final Integer numNrtReplicas;
   private final Integer numTlogReplicas;
   private final Integer numPullReplicas;
   private final Boolean readOnly;
+  private final Boolean perReplicaState;
+  private final Map<String, Replica> replicaMap = new HashMap<>();
+  private volatile PerReplicaStates perReplicaStates;
+
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
     this(name, slices, props, router, Integer.MAX_VALUE);
@@ -86,6 +97,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
     this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
     this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
+    this.perReplicaState = (Boolean) verifyProp(props, PER_REPLICA_STATE, Boolean.FALSE);
+    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> perReplicaStates = it.getStates());
     Boolean readOnly = (Boolean) verifyProp(props, READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
     
@@ -98,13 +111,42 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       }
       for (Replica replica : slice.getValue()) {
         addNodeNameReplica(replica);
+        if (perReplicaState) {
+          replicaMap.put(replica.getName(), replica);
+        }
       }
     }
     this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
     this.router = router;
+    this.znode = ZkStateReader.getCollectionPath(name);
     assert name != null && slices != null;
   }
 
+  /**Update our state with a state of a {@link Replica}
+   * Used to create a new Collection State when only a replica is updated
+   */
+  public DocCollection copyWith( PerReplicaStates newPerReplicaStates) {
+    log.debug("collection :{} going to be updated :  per-replica state :{} -> {}",
+        name,
+        getChildNodesVersion(), newPerReplicaStates.cversion);
+    if (getChildNodesVersion() == newPerReplicaStates.cversion) return this;
+    Set<String> modifiedReplicas = PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
+    if (modifiedReplicas.isEmpty()) return this; //nothing is modified
+    Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
+    for (String s : modifiedReplicas) {
+      Replica replica = getReplica(s);
+      if (replica != null) {
+        Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
+        Slice shard = modifiedShards.get(replica.shard);
+        modifiedShards.put(replica.shard, shard.copyWith(newReplica));
+      }
+    }
+    DocCollection result = new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion);
+    result.perReplicaStates = newPerReplicaStates;
+    return result;
+
+  }
+
   private void addNodeNameReplica(Replica replica) {
     List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
     if (replicas == null) {
@@ -138,6 +180,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
         return Integer.parseInt(o.toString());
       case READ_ONLY:
         return Boolean.parseBoolean(o.toString());
+      case PER_REPLICA_STATE:
+        return Boolean.parseBoolean(o.toString());
       case "snitch":
       default:
         return o;
@@ -149,10 +193,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @param slices the new set of Slices
    * @return the resulting DocCollection
    */
-  public DocCollection copyWithSlices(Map<String, Slice> slices){
-    return new DocCollection(getName(), slices, propMap, router, znodeVersion);
+  public DocCollection copyWithSlices(Map<String, Slice> slices) {
+    DocCollection result = new DocCollection(getName(), slices, propMap, router, znodeVersion);
+    result.perReplicaStates = perReplicaStates;
+    return result;
   }
-
   /**
    * Return collection name.
    */
@@ -224,6 +269,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public int getZNodeVersion(){
     return znodeVersion;
   }
+  public int getChildNodesVersion() {
+    return perReplicaStates == null ? -1 : perReplicaStates.cversion;
+  }
+
+  public boolean isModified(int dataVersion, int childVersion) {
+    if (dataVersion > znodeVersion) return true;
+    if (childVersion > getChildNodesVersion()) return true;
+    return false;
+
+  }
 
   /**
    * @return replication factor for this collection or null if no
@@ -232,7 +287,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Integer getReplicationFactor() {
     return replicationFactor;
   }
-  
+
+  public String getZNode(){
+    return znode;
+  }
+
   public DocRouter getRouter() {
     return router;
   }
@@ -243,7 +302,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public String toString() {
-    return "DocCollection("+name+"/" + znodeVersion + ")=" + toJSONString(this);
+    return "DocCollection("+name+"/" + znode + "/" + znodeVersion
+        + (perReplicaStates == null ? "": perReplicaStates.toString())+")="
+        + toJSONString(this);
   }
 
   @Override
@@ -255,6 +316,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   }
 
   public Replica getReplica(String coreNodeName) {
+    if (perReplicaState) {
+      return replicaMap.get(coreNodeName);
+    }
     for (Slice slice : slices.values()) {
       Replica replica = slice.getReplica(coreNodeName);
       if (replica != null) return replica;
@@ -375,6 +439,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Integer getNumPullReplicas() {
     return numPullReplicas;
   }
+  public boolean isPerReplicaState() {
+    return Boolean.TRUE.equals(perReplicaState);
+  }
+
+  public PerReplicaStates getPerReplicaStates() {
+    return perReplicaStates;
+  }
+
 
   public int getExpectedReplicaCount(Replica.Type type, int def) {
     Integer result = null;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
new file mode 100644
index 0000000..d66b4c5
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+  /**
+   * This is a persist operation with retry if a write fails due to stale state
+   */
+  public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    List<Operation> operations = ops.get();
+    for (int i = 0; i < 10; i++) {
+      try {
+        persist(operations, znode, zkClient);
+        return;
+      } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
+        //state is stale
+        log.info("stale state for {}. retrying...", znode);
+        operations = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
+      }
+    }
+  }
+
+  /**
+   * Persist a set of operations to Zookeeper
+   */
+  public static void persist(List<Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    if (operations == null || operations.isEmpty()) return;
+    log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
+
+    List<Op> ops = new ArrayList<>(operations.size());
+    for (Operation op : operations) {
+      //the state of the replica is being updated
+      String path = znode + "/" + op.state.asString;
+      List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
+      ops.add(op.typ == Operation.Type.ADD ?
+          Op.create(path, null, acls, CreateMode.PERSISTENT) :
+          Op.delete(path, -1));
+    }
+    try {
+      zkClient.multi(ops, true);
+      if (log.isDebugEnabled()) {
+        //nocommit
+        try {
+          Stat stat = zkClient.exists(znode, null, true);
+          log.debug("After update, cversion : {}", stat.getCversion());
+        } catch (Exception e) {
+        }
+
+      }
+    } catch (KeeperException e) {
+      log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+      throw e;
+    }
+
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  private static List<Operation> addDeleteStaleNodes(List<Operation> ops, State rs) {
+    while (rs != null) {
+      ops.add(new Operation(Operation.Type.DELETE, rs));
+      rs = rs.duplicate;
+    }
+    return ops;
+  }
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;
+
+    private State(String serialized, List<String> pieces) {
+      this.asString = serialized;
+      replica = pieces.get(0);
+      version = Integer.parseInt(pieces.get(1));
+      String encodedStatus = pieces.get(2);
+      this.state = Replica.getState(encodedStatus);
+      isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+      duplicate = null;
+    }
+
+    public static State parse(String serialized) {
+      List<String> pieces = StrUtils.splitSmart(serialized, ':');
+      if (pieces.size() < 3) return null;
+      return new State(serialized, pieces);
+
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version) {
+      this(replica, state, isLeader, version, null);
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+      this.replica = replica;
+      this.state = state == null ? Replica.State.ACTIVE : state;
+      this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+      this.version = version;
+      asString = serialize();
+      this.duplicate = duplicate;
+    }
+
+    @Override
+    public void writeMap(EntryWriter ew) throws IOException {
+      ew.put(NAME, replica);
+      ew.put(VERSION, version);
+      ew.put(ZkStateReader.STATE_PROP, state.toString());
+      if (isLeader) ew.put(Slice.LEADER, isLeader);
+      ew.putIfNotNull("duplicate", duplicate);
+    }
+
+    private State insert(State duplicate) {
+      assert this.replica.equals(duplicate.replica);
+      if (this.version >= duplicate.version) {
+        if (this.duplicate != null) {
+          duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
+        }
+        return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+      } else {
+        return duplicate.insert(this);
+      }
+    }
+
+    /**
+     * fetch duplicates entries for this replica
+     */
+    List<State> getDuplicates() {
+      if (duplicate == null) return Collections.emptyList();
+      List<State> result = new ArrayList<>();
+      State current = duplicate;
+      while (current != null) {
+        result.add(current);
+        current = current.duplicate;
+      }
+      return result;
+    }
+
+    private String serialize() {
+      StringBuilder sb = new StringBuilder(replica)
+          .append(":")
+          .append(version)
+          .append(":")
+          .append(state.shortName);
+      if (isLeader) sb.append(":").append("L");
+      return sb.toString();
+    }
+
+
+    @Override
+    public String toString() {
+      return asString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof State) {
+        State that = (State) o;
+        return Objects.equals(this.asString, that.asString);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return asString.hashCode();
+    }
+  }
+
+
+  /**This is a helper class that encapsulates various operations performed on the per-replica states
+   * Do not directly manipulate the per replica states as it can become difficult to debug them
+   *
+   */
+  public static abstract class WriteOps {
+    private PerReplicaStates rs;
+    List<Operation> ops;
+    private boolean preOp = true;
+
+    /**
+     * state of a replica is changed
+     *
+     * @param newState the new state
+     */
+    public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
+      return new WriteOps() {
+        @Override
+        protected List<Operation> refresh(PerReplicaStates rs) {
+          List<Operation> ops = new ArrayList<>(2);
+          State existing = rs.get(replica);
+          if (existing == null) {
+            ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
+          } else {
+            ops.add(new Operation(Operation.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
+            addDeleteStaleNodes(ops, existing);
+          }
+          if (log.isDebugEnabled()) {
+            log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
+          }
+          return ops;
+        }
+      }.init(rs);
+    }
+
+    public PerReplicaStates getPerReplicaStates() {
+      return rs;
+    }
+
+
+    /**Switch a collection from/to perReplicaState=true
+     */
+    public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
+      return new WriteOps() {
+        @Override
+        List<Operation> refresh(PerReplicaStates prs) {
+          return enable ? enable(coll) : disable(prs);
+        }
+
+        List<Operation> enable(DocCollection coll) {
+          List<Operation> result = new ArrayList<>();
+          coll.forEachReplica((s, r) -> result.add(new Operation(Operation.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
+          return result;
+        }
+
+        List<Operation> disable(PerReplicaStates prs) {
+          List<Operation> result = new ArrayList<>();
+          prs.states.forEachEntry((s, state) -> result.add(new Operation(Operation.Type.DELETE, state)));
+          return result;
+        }
+      }.init(prs);
+
+    }
+
+    /**
+     * Flip the leader replica to a new one
+     *
+     * @param allReplicas  allReplicas of the shard
+     * @param next next leader
+     */
+    public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
+      return new WriteOps() {
+
+        @Override
+        protected List<Operation> refresh(PerReplicaStates rs) {
+          List<Operation> ops = new ArrayList<>();
+          if (next != null) {
+            State st = rs.get(next);
+            if (st != null) {
+              if (!st.isLeader) {
+                ops.add(new Operation(Operation.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+                ops.add(new Operation(Operation.Type.DELETE, st));
+              }
+              //else do not do anything , that node is the leader
+            } else {
+              //there is no entry for the new leader.
+              //create one
+              ops.add(new Operation(Operation.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+            }
+          }
+
+          //now go through all other replicas and unset previous leader
+          for (String r : allReplicas) {
+            State st = rs.get(r);
+            if (st == null) continue;//unlikely
+            if (!Objects.equals(r, next)) {
+              if (st.isLeader) {
+                //some other replica is the leader now. unset
+                ops.add(new Operation(Operation.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+                ops.add(new Operation(Operation.Type.DELETE, st));
+              }
+            }
+          }
+          if (log.isDebugEnabled()) {
+            log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+          }
+          return ops;
+        }
+
+      }.init(rs);
+    }
+
+    /**
+     * Delete a replica entry from per-replica states
+     *
+     * @param replica name of the replica to be deleted
+     */
+    public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
+      return new WriteOps() {
+        @Override
+        protected List<Operation> refresh(PerReplicaStates rs) {
+          List<Operation> result;
+          if (rs == null) {
+            result = Collections.emptyList();
+          } else {
+            State state = rs.get(replica);
+            result = addDeleteStaleNodes(new ArrayList<>(), state);
+          }
+          return result;
+        }
+      }.init(rs);
+    }
+
+    public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
+      return new WriteOps() {
+        @Override
+        protected List<Operation> refresh(PerReplicaStates rs) {
+          return singletonList(new Operation(Operation.Type.ADD,
+              new State(replica, state, isLeader, 0)));
+        }
+      }.init(rs);
+    }
+
+    /**
+     * mark a bunch of replicas as DOWN
+     */
+    public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
+      return new WriteOps() {
+        @Override
+        List<Operation> refresh(PerReplicaStates rs) {
+          List<Operation> ops = new ArrayList<>();
+          for (String replica : replicas) {
+            State r = rs.get(replica);
+            if (r != null) {
+              if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+              ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+              addDeleteStaleNodes(ops, r);
+            } else {
+              ops.add(new Operation(Operation.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
+            }
+          }
+          if (log.isDebugEnabled()) {
+            log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
+          }
+          return ops;
+        }
+      }.init(rs);
+    }
+
+    /**
+     * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
+     * is updated
+     */
+    public static WriteOps touchChildren() {
+      WriteOps result = new WriteOps() {
+        @Override
+        List<Operation> refresh(PerReplicaStates rs) {
+          List<Operation> ops = new ArrayList<>();
+          State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+          ops.add(new Operation(Operation.Type.ADD, st));
+          ops.add(new Operation(Operation.Type.DELETE, st));
+          if (log.isDebugEnabled()) {
+            log.debug("touchChildren {}", ops);
+          }
+          return ops;
+        }
+      };
+      result.preOp = false;
+      result.ops = result.refresh(null);
+      return result;
+    }
+
+    WriteOps init(PerReplicaStates rs) {
+      if (rs == null) return null;
+      get(rs);
+      return this;
+    }
+
+    public List<Operation> get() {
+      return ops;
+    }
+
+    public List<Operation> get(PerReplicaStates rs) {
+      ops = refresh(rs);
+      if (ops == null) ops = Collections.emptyList();
+      this.rs = rs;
+      return ops;
+    }
+
+    /**
+     * To be executed before collection state.json is persisted
+     */
+    public boolean isPreOp() {
+      return preOp;
+    }
+
+    /**
+     * if a multi operation fails because the state got modified from behind,
+     * refresh the operation and try again
+     *
+     * @param prs The new state
+     */
+    abstract List<Operation> refresh(PerReplicaStates prs);
+
+    @Override
+    public String toString() {
+      return ops.toString();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+    appendStates(sb);
+    return sb.append("]}").toString();
+  }
+
+  private StringBuilder appendStates(StringBuilder sb) {
+    states.forEachEntry(new BiConsumer<String, State>() {
+      int count = 0;
+      @Override
+      public void accept(String s, State state) {
+        if (count++ > 0) sb.append(", ");
+        sb.append(state.asString);
+        for (State d : state.getDuplicates()) sb.append(d.asString);
+      }
+    });
+    return sb;
+  }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 5f3fd9b..d7fae19 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -17,6 +17,7 @@
 package org.apache.solr.common.cloud;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -29,13 +30,15 @@ import java.util.function.BiPredicate;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.util.Utils;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.ConditionalMapWriter.NON_NULL_VAL;
 import static org.apache.solr.common.ConditionalMapWriter.dedupeKeyPredicate;
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 
 public class Replica extends ZkNodeProps implements MapWriter {
-  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   /**
    * The replica's state. In general, if the node the replica is hosted on is
    * not under {@code /live_nodes} in ZK, the replica's state should be
@@ -53,7 +56,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * {@link ClusterState#liveNodesContain(String)}).
      * </p>
      */
-    ACTIVE,
+    ACTIVE("A"),
     
     /**
      * The first state before {@link State#RECOVERING}. A node in this state
@@ -64,13 +67,13 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * should not be relied on.
      * </p>
      */
-    DOWN,
+    DOWN("D"),
     
     /**
      * The node is recovering from the leader. This might involve peer-sync,
      * full replication or finding out things are already in sync.
      */
-    RECOVERING,
+    RECOVERING("R"),
     
     /**
      * Recovery attempts have not worked, something is not right.
@@ -80,8 +83,16 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * cluster and it's state should be discarded.
      * </p>
      */
-    RECOVERY_FAILED;
-    
+    RECOVERY_FAILED("F");
+
+    /**short name for a state. Used to encode this in the state node see {@link PerReplicaStates.State}
+     */
+    public final String shortName;
+
+    State(String c) {
+      this.shortName = c;
+    }
+
     @Override
     public String toString() {
       return super.toString().toLowerCase(Locale.ROOT);
@@ -89,7 +100,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
     
     /** Converts the state string to a State instance. */
     public static State getState(String stateStr) {
-      return stateStr == null ? null : State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+      return stateStr == null ? null : Replica.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
     }
   }
 
@@ -124,6 +135,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
   public final String core;
   public final Type type;
   public final String shard, collection;
+  private PerReplicaStates.State replicaState;
 
   // mutable
   private State state;
@@ -159,7 +171,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
       this.propMap.putAll(props);
     }
     validate();
-    propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
   }
 
   /**
@@ -181,7 +192,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
     state = State.getState(String.valueOf(details.getOrDefault(ZkStateReader.STATE_PROP, "active")));
     this.propMap.putAll(details);
     validate();
-    propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
+
   }
 
   private final void validate() {
@@ -202,6 +213,8 @@ public class Replica extends ZkNodeProps implements MapWriter {
     propMap.put(ZkStateReader.STATE_PROP, state.toString());
   }
 
+
+
   public String getCollection() {
     return collection;
   }
@@ -237,11 +250,11 @@ public class Replica extends ZkNodeProps implements MapWriter {
   }
 
   public String getCoreUrl() {
-    return ZkCoreNodeProps.getCoreUrl(getBaseUrl(), core);
+    return ZkCoreNodeProps.getCoreUrl(getStr(ZkStateReader.BASE_URL_PROP), core);
   }
 
   public String getBaseUrl() {
-    return getStr(BASE_URL_PROP);
+    return getStr(ZkStateReader.BASE_URL_PROP);
   }
 
   /** SolrCore name. */
@@ -296,6 +309,25 @@ public class Replica extends ZkNodeProps implements MapWriter {
     return propertyValue;
   }
 
+  public Replica copyWith(PerReplicaStates.State state) {
+    log.debug("A replica is updated with new state : {}", state);
+    Map<String, Object> props = new LinkedHashMap<>(propMap);
+    if (state == null) {
+      props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+      props.remove(Slice.LEADER);
+    } else {
+      props.put(ZkStateReader.STATE_PROP, state.state.toString());
+      if (state.isLeader) props.put(Slice.LEADER, "true");
+    }
+    Replica r = new Replica(name, props, collection, shard);
+    r.replicaState = state;
+    return r;
+  }
+
+  public PerReplicaStates.State getReplicaState() {
+    return replicaState;
+  }
+
   public Object clone() {
     return new Replica(name, node, collection, shard, core, state, type, propMap);
   }
@@ -315,13 +347,8 @@ public class Replica extends ZkNodeProps implements MapWriter {
       // propMap takes precedence because it's mutable and we can't control its
       // contents, so a third party may override some declared fields
       for (Map.Entry<String, Object> e : propMap.entrySet()) {
-        final String key = e.getKey();
-        // don't store the base_url as we can compute it from the node_name
-        if (!BASE_URL_PROP.equals(key)) {
-          writer.put(e.getKey(), e.getValue(), p);
-        }
+        writer.put(e.getKey(), e.getValue(), p);
       }
-
       writer.put(ZkStateReader.CORE_NAME_PROP, core, p)
           .put(ZkStateReader.SHARD_ID_PROP, shard, p)
           .put(ZkStateReader.COLLECTION_PROP, collection, p)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 4378ef7..802299a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -25,11 +26,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.solr.common.cloud.Replica.Type;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.util.Utils.toJSONString;
 
@@ -37,6 +41,8 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
  */
 public class Slice extends ZkNodeProps implements Iterable<Replica> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   public final String collection;
 
   /** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
@@ -61,6 +67,14 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values().iterator();
   }
 
+  /**Make a copy with a modified replica
+   */
+  public Slice copyWith(Replica modified) {
+    log.debug("modified replica : {}", modified);
+    Map<String, Replica> replicasCopy = new LinkedHashMap<>(replicas);
+    replicasCopy.put(modified.getName(), modified);
+    return new Slice(name, replicasCopy, propMap, collection);
+  }
   /** The slice's state. */
   public enum State {
 
@@ -107,7 +121,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
     /** Converts the state string to a State instance. */
     public static State getState(String stateStr) {
-      return State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+      return Slice.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
     }
   }
 
@@ -138,9 +152,9 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
     Object rangeObj = propMap.get(RANGE);
     if (propMap.get(ZkStateReader.STATE_PROP) != null) {
-      this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
+      this.state = Slice.State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
     } else {
-      this.state = State.ACTIVE;                         //Default to ACTIVE
+      this.state = Slice.State.ACTIVE;                         //Default to ACTIVE
       propMap.put(ZkStateReader.STATE_PROP, state.toString());
     }
     DocRouter.Range tmpRange = null;
@@ -210,7 +224,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   private Replica findLeader() {
     for (Replica replica : replicas.values()) {
-      if (replica.getStr(LEADER) != null) {
+      if (replica.isLeader()) {
         assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT: "Pull replica should not become leader!";
         return replica;
       }
@@ -235,6 +249,10 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values();
   }
 
+  public Set<String> getReplicaNames() {
+    return Collections.unmodifiableSet(replicas.keySet());
+  }
+
   /**
    * Gets all replicas that match a predicate
    */
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1ac21fe..7a12c92 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -331,6 +331,18 @@ public class SolrZkClient implements Closeable {
   }
 
   /**
+   * Returns children of the node at the path
+   */
+  public List<String> getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
+    } else {
+      return keeper.getChildren(path, wrapWatcher(watcher), stat);
+    }
+  }
+
+  /**
    * Returns node's data
    */
   public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7ff69cf..4d80f1b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -655,7 +655,7 @@ public class ZkStateReader implements SolrCloseable {
 
   private class LazyCollectionRef extends ClusterState.CollectionRef {
     private final String collName;
-    private long lastUpdateTime;
+    private volatile long lastUpdateTime;
     private DocCollection cachedDocCollection;
 
     public LazyCollectionRef(String collName) {
@@ -670,12 +670,12 @@ public class ZkStateReader implements SolrCloseable {
       if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
         boolean shouldFetch = true;
         if (cachedDocCollection != null) {
-          Stat exists = null;
+          Stat freshStats = null;
           try {
-            exists = zkClient.exists(getCollectionPath(collName), null, true);
+            freshStats = zkClient.exists(getCollectionPath(collName), null, true);
           } catch (Exception e) {
           }
-          if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+          if (freshStats != null && !cachedDocCollection.isModified(freshStats.getVersion(), freshStats.getCversion())) {
             shouldFetch = false;
           }
         }
@@ -853,14 +853,18 @@ public class ZkStateReader implements SolrCloseable {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
+    log.debug("getLeaderRetry:@{} {}/{}", System.currentTimeMillis(), collection, shard);
 
+    AtomicReference<DocCollection> coll = new AtomicReference<>();
     AtomicReference<Replica> leader = new AtomicReference<>();
     try {
       waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
         if (c == null)
           return false;
+        coll.set(c);
         Replica l = getLeader(n, c, shard);
         if (l != null) {
+          log.debug("leader found for {}/{} to be {}", collection, shard, l);
           leader.set(l);
           return true;
         }
@@ -1195,9 +1199,11 @@ public class ZkStateReader implements SolrCloseable {
    */
   class StateWatcher implements Watcher {
     private final String coll;
+    private final String collectionPath;
 
     StateWatcher(String coll) {
       this.coll = coll;
+      collectionPath = getCollectionPath(coll);
     }
 
     @Override
@@ -1219,17 +1225,28 @@ public class ZkStateReader implements SolrCloseable {
             event, coll, liveNodes.size());
       }
 
-      refreshAndWatch();
+      refreshAndWatch(event.getType());
 
     }
+    public void refreshAndWatch() {
+      refreshAndWatch(null);
+    }
 
     /**
      * Refresh collection state from ZK and leave a watch for future changes.
      * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
      * with the results of the refresh.
      */
-    public void refreshAndWatch() {
+    public void refreshAndWatch(EventType eventType) {
       try {
+        if (eventType == null || eventType == EventType.NodeChildrenChanged) {
+          refreshAndWatchChildren();
+          if (eventType == EventType.NodeChildrenChanged) {
+            //only per-replica states modified. return
+            return;
+          }
+        }
+
         DocCollection newState = fetchCollectionState(coll, this);
         updateWatchedCollection(coll, newState);
         synchronized (getUpdateLock()) {
@@ -1246,6 +1263,32 @@ public class ZkStateReader implements SolrCloseable {
         log.error("Unwatched collection: [{}]", coll, e);
       }
     }
+
+    private void refreshAndWatchChildren() throws KeeperException, InterruptedException {
+      Stat stat = new Stat();
+      List<String> replicaStates = null;
+      try {
+        replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
+        PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
+        DocCollection oldState = watchedCollectionStates.get(coll);
+        DocCollection newState = null;
+        if (oldState != null) {
+          newState = oldState.copyWith(newStates);
+        } else {
+          newState = fetchCollectionState(coll, null);
+        }
+        updateWatchedCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("updated per-replica states changed for: {}, ver: {} , new vals: {}", coll, stat.getCversion(), replicaStates);
+        }
+
+      } catch (NoNodeException e) {
+        log.info("{} is deleted, stop watching children", collectionPath);
+      }
+    }
   }
 
   /**
@@ -1422,6 +1465,16 @@ public class ZkStateReader implements SolrCloseable {
   private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
     String collectionPath = getCollectionPath(coll);
     while (true) {
+      ClusterState.initReplicaStateProvider(() -> {
+        try {
+          PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
+          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+          return replicaStates;
+        } catch (Exception e) {
+          //TODO
+          throw new RuntimeException(e);
+        }
+      });
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1439,6 +1492,8 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         return null;
+      } finally {
+        ClusterState.clearReplicaStateProvider();
       }
     }
   }
@@ -1557,6 +1612,7 @@ public class ZkStateReader implements SolrCloseable {
         v = new CollectionWatch<>();
         watchSet.set(true);
       }
+      log.info("already watching , added to stateWatchers");
       v.stateWatchers.add(stateWatcher);
       return v;
     });
@@ -1566,11 +1622,26 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
+    state = updatePerReplicaState(state);
     if (stateWatcher.onStateChanged(state) == true) {
       removeDocCollectionWatcher(collection, stateWatcher);
     }
   }
 
+  private DocCollection updatePerReplicaState(DocCollection c) {
+    if (c == null || !c.isPerReplicaState()) return c;
+    PerReplicaStates current = c.getPerReplicaStates();
+    PerReplicaStates newPrs = PerReplicaStates.fetch(c.getZNode(), zkClient, current);
+    if (newPrs != current) {
+      log.debug("just-in-time update for a fresh per-replica-state {}", c.getName());
+      DocCollection modifiedColl = c.copyWith(newPrs);
+      updateWatchedCollection(c.getName(), modifiedColl);
+      return modifiedColl;
+    } else {
+      return c;
+    }
+  }
+
   /**
    * Block until a CollectionStatePredicate returns true, or the wait times out
    *
@@ -1601,12 +1672,18 @@ public class ZkStateReader implements SolrCloseable {
       throw new AlreadyClosedException();
     }
 
+    long waitStartTime = System.currentTimeMillis();
+    log.debug("waiting for collectionState at : {}", waitStartTime);
     final CountDownLatch latch = new CountDownLatch(1);
     waitLatches.add(latch);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     CollectionStateWatcher watcher = (n, c) -> {
       docCollection.set(c);
       boolean matches = predicate.matches(n, c);
+      if (!matches) {
+          log.info(" CollectionStatePredicate failed for {}, after {} secs, cversion : {}", collection, (System.currentTimeMillis() - waitStartTime),
+              (c == null || c.getPerReplicaStates() == null ? "-1" : c.getPerReplicaStates()));
+        }
       if (matches)
         latch.countDown();
 
@@ -1804,7 +1881,9 @@ public class ZkStateReader implements SolrCloseable {
           break;
         }
       } else {
-        if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+        int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
+        int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
+        if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) {
           // no change to state, but we might have been triggered by the addition of a
           // state watcher, so run notifications
           updated = true;
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
index 9d22119..c853c7e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/IndexingNestedDocuments.java
@@ -25,14 +25,11 @@ import java.util.List;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-
 import org.apache.solr.cloud.SolrCloudTestCase;
-
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.util.ExternalPaths;
-
 import org.junit.After;
 import org.junit.BeforeClass;
 
@@ -70,7 +67,9 @@ public class IndexingNestedDocuments extends SolrCloudTestCase {
    */
   public void testIndexingAnonKids() throws Exception {
     final String collection = "test_anon";
-    CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, ANON_KIDS_CONFIG, 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.getSolrClient().setDefaultCollection(collection);
     
     //
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
index 4698c7e..3915a00 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiHeatmapFacetingTest.java
@@ -57,7 +57,9 @@ public class JsonRequestApiHeatmapFacetingTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     indexSpatialData();
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
index 83fe1c3..fe0f316 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/JsonRequestApiTest.java
@@ -64,7 +64,8 @@ public class JsonRequestApiTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
index 04776cc..acb6d3e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
@@ -69,6 +69,7 @@ public class UsingSolrJRefGuideExamplesTest extends SolrCloudTestCase {
         .configure();
 
     CollectionAdminResponse response = CollectionAdminRequest.createCollection("techproducts", "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("techproducts", 1, 1);
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
index e8aef51..f839314 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -41,15 +41,14 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.security.BasicAuthPlugin;
 import org.apache.solr.security.RuleBasedAuthorizationPlugin;
 import org.apache.solr.util.TimeOut;
-
-import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
+
 /**
  * tests various streaming expressions (via the SolrJ {@link SolrStream} API) against a SolrCloud cluster
  * using both Authenticationand Role based Authorization
@@ -126,7 +125,8 @@ public class CloudAuthStreamTest extends SolrCloudTestCase {
 
     for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
       CollectionAdminRequest.createCollection(collection, "_default", 2, 2)
-        .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
+          .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+          .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
         .process(cluster.getSolrClient());
     }
     
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index 8b74a66..a7abfad 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -74,7 +74,9 @@ public class JDBCStreamTest extends SolrCloudTestCase {
     } else {
       collection = COLLECTIONORALIAS;
     }
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
index edef269..b7eaa64 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
@@ -65,7 +65,10 @@ public class MathExpressionTest extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index add4331..2fa0dd0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -68,7 +68,9 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
     } else {
       collection = COLLECTIONORALIAS;
     }
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
         false, true, TIMEOUT);
     if (useAlias) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
index 6c88ffe..4d77540 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java
@@ -96,7 +96,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     
     cluster.waitForActiveCollection(collection, 2, 2);
     
@@ -2679,7 +2680,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("destinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -2773,7 +2775,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -2871,7 +2874,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelDaemonUpdateStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3045,7 +3049,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   public void testParallelTerminatingDaemonUpdateStream() throws Exception {
     Assume.assumeTrue(!useAlias);
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3231,7 +3236,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("destinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("destinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -3324,7 +3330,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection", 2, 2);
 
     new UpdateRequest()
@@ -3422,7 +3429,8 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   @Test
   public void testParallelDaemonCommitStream() throws Exception {
 
-    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("parallelDestinationCollection1", 2, 2);
 
     new UpdateRequest()
@@ -3639,11 +3647,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
   public void testClassifyStream() throws Exception {
     Assume.assumeTrue(!useAlias);
 
-    CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("modelCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("modelCollection", 2, 2);
-    CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("uknownCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("uknownCollection", 2, 2);
-    CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("checkpointCollection", "ml", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("checkpointCollection", 2, 2);
 
     UpdateRequest updateRequest = new UpdateRequest();
@@ -3867,11 +3878,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
   @Test
   public void testExecutorStream() throws Exception {
-    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("workQueue", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("workQueue", 2, 2);
-    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("mainCorpus", 2, 2);
-    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("destination", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
     cluster.waitForActiveCollection("destination", 2, 2);
 
     UpdateRequest workRequest = new UpdateRequest();
@@ -3933,11 +3947,14 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
 
   @Test
   public void testParallelExecutorStream() throws Exception {
-    CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("workQueue1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(),DEFAULT_TIMEOUT);
 
-    CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("mainCorpus1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
 
-    CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
+    CollectionAdminRequest.createCollection("destination1", "conf", 2, 1).setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
 
     cluster.waitForActiveCollection("workQueue1", 2, 2);
     cluster.waitForActiveCollection("mainCorpus1", 2, 2);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
index 48f13c2..6a34971 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/DirectJsonQueryRequestFacetingIntegrationTest.java
@@ -24,10 +24,10 @@ import java.util.List;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
-import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
-import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
+import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.util.ExternalPaths;
@@ -56,7 +56,9 @@ public class DirectJsonQueryRequestFacetingIntegrationTest extends SolrCloudTest
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
index 291bcba..94a46c4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestFacetingIntegrationTest.java
@@ -29,10 +29,10 @@ import java.util.Map;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
-import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
-import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
+import org.apache.solr.client.solrj.response.json.NestableJsonFacet;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrDocumentList;
@@ -63,7 +63,9 @@ public class JsonQueryRequestFacetingIntegrationTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
     up.setParam("collection", COLLECTION_NAME);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
index 1ccd581..977d405 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/json/JsonQueryRequestHeatmapFacetingTest.java
@@ -47,7 +47,9 @@ public class JsonQueryRequestHeatmapFacetingTest extends SolrCloudTestCase {
     final List<String> solrUrls = new ArrayList<>();
     solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
 
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG_NAME, 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
 
     indexSpatialData();
   }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
index b0de383..37afd38 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
@@ -97,6 +97,7 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     assertFalse("CloudCollectionsListener not triggered after registration", newResults.get(2).contains("testcollection1"));
 
     CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     client.waitForState("testcollection1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
         (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -110,6 +111,7 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     client.getZkStateReader().removeCloudCollectionsListener(watcher1);
 
     CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection2", 4, 4);
 
@@ -136,10 +138,12 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
     CloudSolrClient client = cluster.getSolrClient();
 
     CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection1", 4, 4);
     
     CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
         .processAndWait(client, MAX_WAIT_TIMEOUT);
     cluster.waitForActiveCollection("testcollection2", 4, 4);
 
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 8c19f3e..7b97521 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -33,6 +33,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,7 +122,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     // note: one node in our cluster is unsed by collection
     CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
@@ -169,7 +171,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     final CountDownLatch latch = new CountDownLatch(1);
     client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -199,7 +202,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
@@ -217,6 +221,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   }
 
   @Test
+  @Ignore
+  //nocommit
   public void testCanWaitForNonexistantCollection() throws Exception {
 
     Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
@@ -247,7 +253,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                         (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
@@ -301,7 +308,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   @Test
   public void testDeletionsTriggerWatches() throws Exception {
     CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
-      .process(cluster.getSolrClient());
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     
     Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                                               (l, c) -> c == null);
@@ -315,7 +323,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   public void testLiveNodeChangesTriggerWatches() throws Exception {
     final CloudSolrClient client = cluster.getSolrClient();
     
-    CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
+    CollectionAdminRequest.createCollection("test_collection", "config", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(client);
 
     Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
                                               (l, c) -> (l.size() == 1 + CLUSTER_SIZE));
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
new file mode 100644
index 0000000..daeb2a6
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.zookeeper.CreateMode;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestPerReplicaStates extends SolrCloudTestCase {
+  @Before
+  public void prepareCluster() throws Exception {
+    configureCluster(4)
+        .configure();
+  }
+
+  @After
+  public void tearDownCluster() throws Exception {
+    shutdownCluster();
+  }
+
+  public void testBasic() {
+    PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+    assertEquals("R1:1:A", rs.asString);
+
+    rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+    assertEquals("R1:1:D:L", rs.asString);
+    rs = PerReplicaStates.State.parse (rs.asString);
+    assertEquals(State.DOWN, rs.state);
+
+  }
+
+  public void testEntries() {
+    PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+    assertEquals(2, entries.get("R1").version);
+    entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+    assertEquals(2, entries.get("R1").version);
+    assertEquals(2, entries.get("R1").getDuplicates().size());
+    Set<String> modified = PerReplicaStates.findModifiedReplicas(entries,  new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+    assertEquals(1, modified.size());
+    assertTrue(modified.contains("R3"));
+    modified = PerReplicaStates.findModifiedReplicas( entries,
+        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+    assertEquals(2, modified.size());
+    assertTrue(modified.contains("R3"));
+    assertTrue(modified.contains("R4"));
+    modified = PerReplicaStates.findModifiedReplicas( entries,
+        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+    assertEquals(3, modified.size());
+    assertTrue(modified.contains("R3"));
+    assertTrue(modified.contains("R4"));
+    assertTrue(modified.contains("R2"));
+
+
+  }
+
+  public void testReplicaStateOperations() throws Exception {
+    String root = "/testReplicaStateOperations";
+    cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+    ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+    for (String state : states) {
+      cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
+    }
+
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    PerReplicaStates rs = zkStateReader.getReplicaStates(new PerReplicaStates(root, 0, Collections.emptyList()));
+    assertEquals(3, rs.states.size());
+    assertTrue(rs.cversion >= 5);
+
+    List<PerReplicaStates.Operation> ops = PerReplicaStates.WriteOps.addReplica("R5",State.ACTIVE, false, rs).get();
+
+    assertEquals(1, ops.size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD ,ops.get(0).typ );
+    PerReplicaStates.persist(ops, root,cluster.getZkClient());
+    rs = zkStateReader.getReplicaStates(root);
+    assertEquals(4, rs.states.size());
+    assertTrue(rs.cversion >= 6);
+    assertEquals(6,  cluster.getZkClient().getChildren(root, null,true).size());
+    ops =  PerReplicaStates.WriteOps.flipState("R1", State.DOWN , rs).get();
+
+    assertEquals(4, ops.size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD,  ops.get(0).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.get(1).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.get(2).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.get(3).typ);
+    PerReplicaStates.persist(ops, root,cluster.getZkClient());
+    rs = zkStateReader.getReplicaStates(root);
+    assertEquals(4, rs.states.size());
+    assertEquals(3, rs.states.get("R1").version);
+
+    ops =  PerReplicaStates.WriteOps.deleteReplica("R5" , rs).get();
+    assertEquals(1, ops.size());
+    PerReplicaStates.persist(ops, root,cluster.getZkClient());
+
+    rs = zkStateReader.getReplicaStates(root);
+    assertEquals(3, rs.states.size());
+
+    ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs).get();
+
+    assertEquals(2, ops.size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD, ops.get(0).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.get(1).typ);
+    PerReplicaStates.persist(ops, root,cluster.getZkClient());
+    rs = zkStateReader.getReplicaStates(root);
+    ops =  PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs).get();
+    assertEquals(4, ops.size());
+    PerReplicaStates.persist(ops, root,cluster.getZkClient());
+    rs = zkStateReader.getReplicaStates(root);
+    assertTrue(rs.get("R3").isLeader);
+  }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index b646e2e..c6f26c6 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -81,6 +81,7 @@ import org.slf4j.LoggerFactory;
 public class SolrCloudTestCase extends SolrTestCaseJ4 {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final Boolean USE_PER_REPLICA_STATE = Boolean.parseBoolean(System.getProperty("use.per-replica", "false"));
 
   public static final int DEFAULT_TIMEOUT = 45; // this is an important timeout for test stability - can't be too short