You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/12/15 13:14:32 UTC

[lucene-solr] 01/01: solr13951

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr_13951
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 42b9e598f37800f71d4fbad66588137c7d45a129
Author: noblepaul <no...@gmail.com>
AuthorDate: Wed Dec 16 00:14:03 2020 +1100

    solr13951
---
 .../apache/solr/cloud/overseer/NodeMutator.java    |  15 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  19 +-
 .../apache/solr/cloud/overseer/SliceMutator.java   |  37 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  80 ++-
 .../apache/solr/cloud/overseer/ZkWriteCommand.java |  20 +
 .../client/solrj/impl/SolrClientCloudManager.java  |   3 +
 .../org/apache/solr/common/cloud/ClusterState.java |  71 +++
 .../apache/solr/common/cloud/DocCollection.java    |  71 ++-
 .../apache/solr/common/cloud/PerReplicaStates.java | 587 +++++++++++++++++++++
 .../java/org/apache/solr/common/cloud/Replica.java |  66 ++-
 .../java/org/apache/solr/common/cloud/Slice.java   |  22 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  12 +
 .../apache/solr/common/cloud/ZkStateReader.java    |  99 +++-
 .../solr/common/cloud/TestPerReplicaStates.java    | 137 +++++
 14 files changed, 1203 insertions(+), 36 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 3f1971e..77f0550 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -45,6 +46,8 @@ public class NodeMutator {
 
     Map<String, DocCollection> collections = clusterState.getCollectionsMap();
     for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+       List<String> downedReplicas = new ArrayList<>();
+
       String collection = entry.getKey();
       DocCollection docCollection = entry.getValue();
 
@@ -64,10 +67,11 @@ public class NodeMutator {
           if (rNodeName.equals(nodeName)) {
             log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
             Map<String, Object> props = replica.shallowCopy();
-            Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
-                Replica.State.DOWN, replica.type, props);
+            props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+            Replica newReplica = new Replica(replica.getName(), props, collection, slice.getName());
             newReplicas.put(replica.getName(), newReplica);
             needToUpdateCollection = true;
+            downedReplicas.add(replica.getName());
           }
         }
 
@@ -76,7 +80,12 @@ public class NodeMutator {
       }
 
       if (needToUpdateCollection) {
-        zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+        if(docCollection.isPerReplicaState()) {
+          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
+              PerReplicaStates.WriteOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+        } else {
+          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+        }
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f849143..747cf90 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -40,7 +40,9 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 public class ReplicaMutator {
@@ -57,10 +60,12 @@ public class ReplicaMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
+  protected SolrZkClient zkClient;
 
   public ReplicaMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
+    this.zkClient = getZkClient(cloudManager);
   }
 
   protected Replica setProperty(Replica replica, String key, String value) {
@@ -260,6 +265,7 @@ public class ReplicaMutator {
       log.info("Failed to update state because the replica does not exist, {}", message);
       return ZkStateWriter.NO_OP;
     }
+    boolean persistCollectionState = collection != null && collection.isPerReplicaState();
 
     if (coreNodeName == null) {
       coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
@@ -271,6 +277,7 @@ public class ReplicaMutator {
           log.info("Failed to update state because the replica does not exist, {}", message);
           return ZkStateWriter.NO_OP;
         }
+        persistCollectionState = true;
         // if coreNodeName is null, auto assign one
         coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
       }
@@ -285,6 +292,7 @@ public class ReplicaMutator {
       if (sliceName != null) {
         log.debug("shard={} is already registered", sliceName);
       }
+      persistCollectionState = true;
     }
     if (sliceName == null) {
       //request new shardId
@@ -295,13 +303,15 @@ public class ReplicaMutator {
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
+      persistCollectionState = true;
     }
 
     Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
+    Replica oldReplica = null;
     if (slice != null) {
-      Replica oldReplica = slice.getReplica(coreNodeName);
+      oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -366,7 +376,12 @@ public class ReplicaMutator {
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
     log.debug("Collection is now: {}", newCollection);
-    return new ZkWriteCommand(collectionName, newCollection);
+    if(collection != null && collection.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+      return new ZkWriteCommand(collectionName, newCollection, PerReplicaStates.WriteOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+    } else{
+      return new ZkWriteCommand(collectionName, newCollection);
+    }
   }
 
   private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 40ab1a3..0d95993 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -25,14 +25,17 @@ import java.util.Set;
 import com.google.common.collect.ImmutableSet;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,10 +54,21 @@ public class SliceMutator {
 
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
+  protected final SolrZkClient zkClient;
 
   public SliceMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
+    this.zkClient = getZkClient(cloudManager);
+  }
+
+  static SolrZkClient getZkClient(SolrCloudManager cloudManager) {
+    if (cloudManager instanceof SolrClientCloudManager) {
+      SolrClientCloudManager manager = (SolrClientCloudManager) cloudManager;
+      return manager.getZkClient();
+    } else {
+      return null;
+    }
   }
 
   public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -80,7 +94,14 @@ public class SliceMutator {
             ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
             ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
             ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
-    return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+
+    if(collection.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
+              PerReplicaStates.WriteOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+    } else {
+      return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+    }
   }
 
   public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -124,6 +145,7 @@ public class SliceMutator {
     Slice slice = slices.get(sliceName);
 
     Replica oldLeader = slice.getLeader();
+    Replica newLeader = null;
     final Map<String, Replica> newReplicas = new LinkedHashMap<>();
     for (Replica replica : slice.getReplicas()) {
       // TODO: this should only be calculated once and cached somewhere?
@@ -132,7 +154,7 @@ public class SliceMutator {
       if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
         replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
       } else if (coreURL.equals(leaderUrl)) {
-        replica = new ReplicaMutator(cloudManager).setLeader(replica);
+        newLeader= replica = new ReplicaMutator(cloudManager).setLeader(replica);
       }
 
       newReplicas.put(replica.getName(), replica);
@@ -141,7 +163,16 @@ public class SliceMutator {
     Map<String, Object> newSliceProps = slice.shallowCopy();
     newSliceProps.put(Slice.REPLICAS, newReplicas);
     slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
-    return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+    if(coll.isPerReplicaState()) {
+      PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
+              PerReplicaStates.WriteOps.flipLeader(
+                      slice.getReplicaNames(),
+                      newLeader == null ? null : newLeader.getName(),
+                      prs), false);
+    } else {
+      return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+    }
   }
 
   public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 155fbc2..26e100f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -27,6 +27,7 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -63,9 +64,10 @@ public class ZkStateWriter {
   protected final ZkStateReader reader;
   protected final Stats stats;
 
-  protected Map<String, DocCollection> updates = new HashMap<>();
+  protected Map<String, ZkWriteCommand> updates = new HashMap<>();
   private int numUpdates = 0;
   protected ClusterState clusterState = null;
+  protected boolean isClusterStateModified = false;
   protected long lastUpdatedTime = 0;
 
   /**
@@ -111,15 +113,49 @@ public class ZkStateWriter {
     if (cmds.isEmpty()) return prevState;
     if (isNoOps(cmds)) return prevState;
 
+    boolean forceFlush = false;
+    if(cmds.size() == 1) {
+      //most messages result in only one command. let's deal with it right away
+      ZkWriteCommand cmd = cmds.get(0);
+      if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+        //we do not wish to batch any updates for collections with per-replica state because
+        // these changes go to individual ZK nodes and there is zero advantage to batching
+        //now check if there are any updates for the same collection already present
+        if (updates.containsKey(cmd.name)) {
+          //this should not happen
+          // but let's get those updates out anyway
+          writeUpdate(updates.remove(cmd.name));
+        }
+        //now let's write the current message
+        try {
+          return writeUpdate(cmd);
+        } finally {
+          if(callback !=null) callback.onWrite();
+        }
+      }
+    } else {
+      //there are more than one commands created as a result of this message
+      for (ZkWriteCommand cmd : cmds) {
+        if(cmd.collection != null && cmd.collection.isPerReplicaState()) {
+          // we don't try to optimize for this case. let's flush out all after this
+          forceFlush = true;
+          break;
+        }
+      }
+    }
+
+
     for (ZkWriteCommand cmd : cmds) {
       if (cmd == NO_OP) continue;
       prevState = prevState.copyWith(cmd.name, cmd.collection);
-      updates.put(cmd.name, cmd.collection);
-      numUpdates++;
+      if (cmd.collection == null) {
+        updates.put(cmd.name, cmd);
+        numUpdates++;
+      }
     }
     clusterState = prevState;
 
-    if (maybeFlushAfter()) {
+    if (forceFlush ||  maybeFlushAfter()) {
       ClusterState state = writePendingUpdates();
       if (callback != null) {
         callback.onWrite();
@@ -147,9 +183,17 @@ public class ZkStateWriter {
   }
 
   public boolean hasPendingUpdates() {
-    return numUpdates != 0;
+    return numUpdates != 0 || isClusterStateModified;
+  }
+  public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
+    Map<String, ZkWriteCommand> commands =  new HashMap<>();
+    commands.put(command.name, command);
+    return writePendingUpdates(commands);
   }
+  public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+    return writePendingUpdates(updates);
 
+  }
   /**
    * Writes all pending updates to ZooKeeper and returns the modified cluster state
    *
@@ -158,20 +202,30 @@ public class ZkStateWriter {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if (!hasPendingUpdates()) return clusterState;
+    if ((updates == this.updates)
+        && !hasPendingUpdates()) {
+      return clusterState;
+    }
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          DocCollection c = entry.getValue();
+          ZkWriteCommand cmd = entry.getValue();
+          DocCollection c = cmd.collection;
 
+          if(cmd.ops != null && cmd.ops.isPreOp()) {
+            PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+            clusterState = clusterState.copyWith(name,
+                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+          }
+          if(!cmd.persistCollState) continue;
           if (c == null) {
             // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
             log.debug("going to delete state.json {}", path);
@@ -192,6 +246,14 @@ public class ZkStateWriter {
               clusterState = clusterState.copyWith(name, newCollection);
             }
           }
+            if(cmd.ops != null && !cmd.ops.isPreOp()) {
+              PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+              DocCollection currentCollState = clusterState.getCollection(cmd.name);
+              if ( currentCollState != null) {
+                clusterState = clusterState.copyWith(name,
+                        currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+              }
+          }
         }
 
         updates.clear();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index d464863..2f71674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,16 +17,34 @@
 package org.apache.solr.cloud.overseer;
 
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 
 public class ZkWriteCommand {
+
   public final String name;
   public final DocCollection collection;
+
   public final boolean noop;
+  // persist the collection state. If this is false, it means the collection state is not modified
+  public final boolean persistCollState;
+  public final PerReplicaStates.WriteOps ops;
 
+  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStates.WriteOps replicaOps, boolean persistCollState) {
+    boolean isPerReplicaState = collection.isPerReplicaState();
+    this.name = name;
+    this.collection = collection;
+    this.noop = false;
+    this.ops = isPerReplicaState ? replicaOps : null;
+    this.persistCollState = isPerReplicaState ? persistCollState : true;
+  }
   public ZkWriteCommand(String name, DocCollection collection) {
     this.name = name;
     this.collection = collection;
     this.noop = false;
+    persistCollState = true;
+    this.ops = collection != null && collection.isPerReplicaState() ?
+        PerReplicaStates.WriteOps.touchChildren():
+        null;
   }
 
   /**
@@ -36,6 +54,8 @@ public class ZkWriteCommand {
     this.noop = true;
     this.name = null;
     this.collection = null;
+    this.ops = null;
+    persistCollState = true;
   }
 
   public static ZkWriteCommand noop() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 5ad7ff4..7724b58 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -187,6 +187,9 @@ public class SolrClientCloudManager implements SolrCloudManager {
       return EMPTY;
     }
   }
+  public SolrZkClient getZkClient() {
+    return zkClient;
+  }
 
   @Override
   public DistributedQueueFactory getDistributedQueueFactory() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index ebed0ff..5f8ee6b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -23,15 +24,19 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Immutable state of the cloud. Normally you can get the state by using
@@ -39,6 +44,8 @@ import org.noggit.JSONWriter;
  * @lucene.experimental
  */
 public class ClusterState implements JSONWriter.Writable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
 
   private final Map<String, CollectionRef> collectionStates, immutableCollectionStates;
   private Set<String> liveNodes;
@@ -241,6 +248,12 @@ public class ClusterState implements JSONWriter.Writable {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
+    if("true".equals(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
+      log.info("a collection {} has per-replica state" , name);
+      //this collection has replica states stored outside
+      ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
+      if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
+    }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(DocCollection.SHARDS);
     if (sliceObjs == null) {
@@ -383,5 +396,63 @@ public class ClusterState implements JSONWriter.Writable {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+  }
+  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+    REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
+  }
+
+
+  public static void clearReplicaStateProvider(){
+    REPLICASTATES_PROVIDER.remove();
+  }
+
+  private static class StatesProvider implements ReplicaStatesProvider {
+    private final Supplier<PerReplicaStates> replicaStatesSupplier;
+    private PerReplicaStates perReplicaStates;
+    private boolean isPerReplicaState = false;
+
+    public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+      this.replicaStatesSupplier = replicaStatesSupplier;
+    }
+
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return isPerReplicaState ? Optional.of(this) : Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      if(perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
+      return perReplicaStates;
+    }
+
+  }
+
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index c35ee8a..7839532 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -30,6 +31,8 @@ import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
@@ -42,9 +45,12 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
  */
 public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
 
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
+  public static final String PER_REPLICA_STATE = "perReplicaState";
 
   private final int znodeVersion;
 
@@ -55,12 +61,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Map<String, List<Replica>> nodeNameReplicas;
   private final Map<String, List<Replica>> nodeNameLeaderReplicas;
   private final DocRouter router;
+  private final String znode;
 
   private final Integer replicationFactor;
   private final Integer numNrtReplicas;
   private final Integer numTlogReplicas;
   private final Integer numPullReplicas;
+  private final Boolean perReplicaState;
   private final Boolean readOnly;
+  private final Map<String, Replica> replicaMap = new HashMap<>();
+  private volatile PerReplicaStates perReplicaStates;
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
     this(name, slices, props, router, Integer.MAX_VALUE);
@@ -86,6 +96,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
     this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
     this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
+    this.perReplicaState = (Boolean) verifyProp(props, PER_REPLICA_STATE, Boolean.FALSE);
     Boolean readOnly = (Boolean) verifyProp(props, READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
     
@@ -98,13 +109,42 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       }
       for (Replica replica : slice.getValue()) {
         addNodeNameReplica(replica);
+        if(perReplicaState) {
+          replicaMap.put(replica.getName(), replica);
+        }
       }
     }
     this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
     this.router = router;
+    this.znode = ZkStateReader.getCollectionPath(name);
     assert name != null && slices != null;
   }
 
+  /**Update our state with a state of a {@link Replica}
+   * Used to create a new Collection State when only a replica is updated
+   */
+  public DocCollection copyWith( PerReplicaStates newPerReplicaStates) {
+    log.debug("collection :{} going to be updated :  per-replica state :{} -> {}",
+        name,
+        getChildNodesVersion(), newPerReplicaStates.cversion);
+    if(getChildNodesVersion() == newPerReplicaStates.cversion) return this;
+    Set<String> modifiedReplicas = PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
+    if(modifiedReplicas.isEmpty()) return this; //nothing is modified
+    Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
+    for (String s : modifiedReplicas) {
+      Replica replica = getReplica(s);
+      if(replica != null) {
+        Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
+        Slice shard = modifiedShards.get(replica.shard);
+        modifiedShards.put(replica.shard, shard.copyWith(newReplica));
+      }
+    }
+    DocCollection result = new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion);
+    result.perReplicaStates = newPerReplicaStates;
+    return result;
+
+  }
+
   private void addNodeNameReplica(Replica replica) {
     List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
     if (replicas == null) {
@@ -138,6 +178,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
         return Integer.parseInt(o.toString());
       case READ_ONLY:
         return Boolean.parseBoolean(o.toString());
+      case PER_REPLICA_STATE:
+        return Boolean.parseBoolean(o.toString());
       case "snitch":
       default:
         return o;
@@ -224,6 +266,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public int getZNodeVersion(){
     return znodeVersion;
   }
+  public int getChildNodesVersion() {
+    return perReplicaStates == null ? -1 : perReplicaStates.cversion;
+  }
+
+  public boolean isModified(int dataVersion, int childVersion) {
+    if(dataVersion > znodeVersion) return true;
+    if(childVersion > getChildNodesVersion()) return true;
+    return false;
+
+  }
 
   /**
    * @return replication factor for this collection or null if no
@@ -232,7 +284,12 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Integer getReplicationFactor() {
     return replicationFactor;
   }
-  
+
+  public String getZNode(){
+    return znode;
+  }
+
+
   public DocRouter getRouter() {
     return router;
   }
@@ -255,6 +312,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   }
 
   public Replica getReplica(String coreNodeName) {
+    if(perReplicaState) {
+      return replicaMap.get(coreNodeName);
+    }
     for (Slice slice : slices.values()) {
       Replica replica = slice.getReplica(coreNodeName);
       if (replica != null) return replica;
@@ -375,6 +435,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Integer getNumPullReplicas() {
     return numPullReplicas;
   }
+  public boolean isPerReplicaState() {
+    return Boolean.TRUE.equals(perReplicaState);
+  }
+
+  public PerReplicaStates getPerReplicaStates() {
+    return perReplicaStates;
+  }
+
 
   public int getExpectedReplicaCount(Replica.Type type, int def) {
     Integer result = null;
@@ -382,5 +450,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     if (type == Replica.Type.PULL) result = numPullReplicas;
     if (type == Replica.Type.TLOG) result = numTlogReplicas;
     return result == null ? def : result;
+
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
new file mode 100644
index 0000000..3eb2ca2
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    public static final char SEPARATOR = ':';
+
+
+    @JsonProperty
+    public final String path;
+
+    @JsonProperty
+    public final int cversion;
+
+    @JsonProperty
+    public final SimpleMap<State> states;
+
+    public PerReplicaStates(String path, int cversion, List<String> states) {
+        this.path = path;
+        this.cversion = cversion;
+        Map<String, State> tmp = new LinkedHashMap<>();
+
+        for (String state : states) {
+            State rs = State.parse(state);
+            if (rs == null) continue;
+            State existing = tmp.get(rs.replica);
+            if (existing == null) {
+                tmp.put(rs.replica, rs);
+            } else {
+                tmp.put(rs.replica, rs.insert(existing));
+            }
+        }
+        this.states = new WrappedSimpleMap<>(tmp);
+
+    }
+
+    /**Get the changed replicas
+     */
+    public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+        Set<String> result = new HashSet<>();
+        if(fresh == null) {
+            old.states.forEachKey(result::add);
+            return result;
+        }
+        old.states.forEachEntry((s, state) -> {
+            //the state is modified or missing
+            if(!Objects.equals(fresh.get(s) , state)) result.add(s);
+        });
+        fresh.states.forEachEntry((s, state) -> { if(old.get(s) == null ) result.add(s);
+        });
+        return result;
+    }
+
+    /**
+     * This is a persist operation with retry if a write fails due to stale state
+     */
+    public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+        try {
+            persist(ops.get(), znode, zkClient);
+        } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
+            //state is stale
+            log.info("stale state for {} . retrying...", znode);
+            List<Op> freshOps = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
+            persist(freshOps, znode, zkClient);
+            log.info("retried for stale state {}, succeeded", znode);
+        }
+    }
+
+    /**
+     * Persist a set of operations to Zookeeper
+     */
+    public static void persist(List<Op> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+        if (operations == null || operations.isEmpty()) return;
+        log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
+
+        List<org.apache.zookeeper.Op> ops = new ArrayList<>(operations.size());
+        for (Op op : operations) {
+            //the state of the replica is being updated
+            String path = znode + "/" + op.state.asString;
+            List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
+            ops.add(op.typ == Op.Type.ADD ?
+                    org.apache.zookeeper.Op.create(path, null, acls, CreateMode.PERSISTENT) :
+                    org.apache.zookeeper.Op.delete(path, -1));
+        }
+        try {
+            zkClient.multi(ops, true);
+            if (log.isDebugEnabled()) {
+                //nocommit
+                try {
+                    Stat stat = zkClient.exists(znode, null, true);
+                    log.debug("After update, cversion : {}", stat.getCversion());
+                } catch (Exception e) {
+                }
+
+            }
+        } catch (KeeperException e) {
+            log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+            throw e;
+        }
+
+    }
+
+
+    /**
+     * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+     * If this is not modified, the same object is returned
+     */
+    public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+        try {
+            if (current != null) {
+                Stat stat = zkClient.exists(current.path, null, true);
+                if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+                if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+            }
+            Stat stat = new Stat();
+            List<String> children = zkClient.getChildren(path, null, stat, true);
+            return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+        } catch (KeeperException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+        } catch (InterruptedException e) {
+            SolrZkClient.checkInterrupted(e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+        }
+    }
+
+
+    private static List<Op> addDeleteStaleNodes(List<Op> ops, State rs) {
+        while (rs != null) {
+            ops.add(new Op(Op.Type.DELETE, rs));
+            rs = rs.duplicate;
+        }
+        return ops;
+    }
+
+    public static String getReplicaName(String s) {
+        int idx = s.indexOf(SEPARATOR);
+        if (idx > 0) {
+            return s.substring(0, idx);
+        }
+        return null;
+    }
+
+    public State get(String replica) {
+        return states.get(replica);
+    }
+
+    public static class Op {
+        public final Type typ;
+        public final State state;
+
+        public Op(Type typ, State replicaState) {
+            this.typ = typ;
+            this.state = replicaState;
+        }
+
+
+        public enum Type {
+            //add a new node
+            ADD,
+            //delete an existing node
+            DELETE
+        }
+
+        @Override
+        public String toString() {
+            return typ.toString() + " : " + state;
+        }
+    }
+
+
+    /**
+     * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+     */
+    public static class State implements MapWriter {
+
+        public final String replica;
+
+        public final Replica.State state;
+
+        public final Boolean isLeader;
+
+        public final int version;
+
+        public final String asString;
+
+        /**
+         * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+         * <p>
+         * the entry with '13' is the latest and the one with '12' is considered a duplicate
+         * <p>
+         * These are unlikely, but possible
+         */
+        final State duplicate;
+
+        private State(String serialized, List<String> pieces) {
+            this.asString = serialized;
+            replica = pieces.get(0);
+            version = Integer.parseInt(pieces.get(1));
+            String encodedStatus = pieces.get(2);
+            this.state = Replica.getState(encodedStatus);
+            isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+            duplicate = null;
+        }
+
+        public static State parse(String serialized) {
+            List<String> pieces = StrUtils.splitSmart(serialized, ':');
+            if (pieces.size() < 3) return null;
+            return new State(serialized, pieces);
+
+        }
+
+        public State(String replica, Replica.State state, Boolean isLeader, int version) {
+            this(replica, state, isLeader, version, null);
+        }
+
+        public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+            this.replica = replica;
+            this.state = state == null ? Replica.State.ACTIVE : state;
+            this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+            this.version = version;
+            asString = serialize();
+            this.duplicate = duplicate;
+        }
+
+        @Override
+        public void writeMap(EntryWriter ew) throws IOException {
+            ew.put(NAME, replica);
+            ew.put(VERSION, version);
+            ew.put(ZkStateReader.STATE_PROP, state.toString());
+            if (isLeader) ew.put(Slice.LEADER, isLeader);
+            ew.putIfNotNull("duplicate", duplicate);
+        }
+
+        private State insert(State duplicate) {
+            assert this.replica.equals(duplicate.replica);
+            if (this.version >= duplicate.version) {
+                if (this.duplicate != null) {
+                    duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
+                }
+                return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+            } else {
+                return duplicate.insert(this);
+            }
+        }
+
+        /**
+         * fetch duplicates entries for this replica
+         */
+        List<State> getDuplicates() {
+            if (duplicate == null) return Collections.emptyList();
+            List<State> result = new ArrayList<>();
+            State current = duplicate;
+            while (current != null) {
+                result.add(current);
+                current = current.duplicate;
+            }
+            return result;
+        }
+
+        private String serialize() {
+            StringBuilder sb = new StringBuilder(replica)
+                    .append(":")
+                    .append(version)
+                    .append(":")
+                    .append(state.shortName);
+            if (isLeader) sb.append(":").append("L");
+            return sb.toString();
+        }
+
+
+        @Override
+        public String toString() {
+            return asString;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof State) {
+                State that = (State) o;
+                return Objects.equals(this.asString, that.asString);
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return asString.hashCode();
+        }
+    }
+
+
+    public static abstract class WriteOps {
+        private PerReplicaStates rs;
+        List<Op> ops;
+        private boolean preOp = true;
+
+        /**
+         * state of a replica is changed
+         *
+         * @param newState the new state
+         */
+        public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
+            return new WriteOps() {
+                @Override
+                protected List<Op> refresh(PerReplicaStates rs) {
+                    List<Op> ops = new ArrayList<>(2);
+                    State existing = rs.get(replica);
+                    if (existing == null) {
+                        ops.add(new Op(Op.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
+                    } else {
+                        ops.add(new Op(Op.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
+                        addDeleteStaleNodes(ops, existing);
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
+                    }
+                    return ops;
+                }
+            }.init(rs);
+        }
+
+        public PerReplicaStates getPerReplicaStates() {
+            return rs;
+        }
+
+
+        /**Switch a collection from/to perReplicaState=true
+         */
+        public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
+            return new WriteOps() {
+                @Override
+                List<Op> refresh(PerReplicaStates prs) {
+                    return enable ? enable(coll) : disable(prs);
+                }
+
+                List<Op> enable(DocCollection coll) {
+                    List<Op> result = new ArrayList<>();
+                    coll.forEachReplica((s, r) -> result.add(new Op(Op.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
+                    return result;
+                }
+
+                List<Op> disable(PerReplicaStates prs) {
+                    List<Op> result = new ArrayList<>();
+                    prs.states.forEachEntry((s, state) -> result.add(new Op(Op.Type.DELETE, state)));
+                    return result;
+                }
+            }.init(prs);
+
+        }
+
+        /**
+         * Flip the leader replica to a new one
+         *
+         * @param allReplicas  allReplicas of the shard
+         * @param next next leader
+         */
+        public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
+            return new WriteOps() {
+
+                @Override
+                protected List<Op> refresh(PerReplicaStates rs) {
+                    List<Op> ops = new ArrayList<>(4);
+                    if(next != null) {
+                        State st = rs.get(next);
+                        if (st != null) {
+                            if (!st.isLeader) {
+                                ops.add(new Op(Op.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+                                ops.add(new Op(Op.Type.DELETE, st));
+                            }
+                            //else do not do anything , that node is the leader
+                        } else {
+                            //there is no entry for the new leader.
+                            //create one
+                            ops.add(new Op(Op.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+                        }
+                    }
+
+                    //now go through all other replicas and unset previous leader
+                    for (String r : allReplicas) {
+                        State st = rs.get(r);
+                        if (st == null) continue;//unlikely
+                        if (!Objects.equals(r, next)) {
+                            if(st.isLeader) {
+                                //some other replica is the leader now. unset
+                                ops.add(new Op(Op.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+                                ops.add(new Op(Op.Type.DELETE, st));
+                            }
+                        }
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+                    }
+                    return ops;
+                }
+
+            }.init(rs);
+        }
+
+        /**
+         * Delete a replica entry from per-replica states
+         *
+         * @param replica name of the replica to be deleted
+         */
+        public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
+            return new WriteOps() {
+                @Override
+                protected List<Op> refresh(PerReplicaStates rs) {
+                    List<Op> result;
+                    if (rs == null) {
+                        result = Collections.emptyList();
+                    } else {
+                        State state = rs.get(replica);
+                        result = addDeleteStaleNodes(new ArrayList<>(), state);
+                    }
+                    return result;
+                }
+            }.init(rs);
+        }
+
+        public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
+            return new WriteOps() {
+                @Override
+                protected List<Op> refresh(PerReplicaStates rs) {
+                    return singletonList(new Op(Op.Type.ADD,
+                            new State(replica, state, isLeader, 0)));
+                }
+            }.init(rs);
+        }
+
+        /**
+         * mark a bunch of replicas as DOWN
+         */
+        public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
+            return new WriteOps() {
+                @Override
+                List<Op> refresh(PerReplicaStates rs) {
+                    List<Op> ops = new ArrayList<>();
+                    for (String replica : replicas) {
+                        State r = rs.get(replica);
+                        if (r != null) {
+                            if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+                            ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+                            addDeleteStaleNodes(ops, r);
+                        } else {
+                            ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
+                        }
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
+                    }
+                    return ops;
+                }
+            }.init(rs);
+        }
+
+        /**
+         * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
+         * is updated
+         */
+        public static WriteOps touchChildren() {
+            WriteOps result = new WriteOps() {
+                @Override
+                List<Op> refresh(PerReplicaStates rs) {
+                    List<Op> ops = new ArrayList<>();
+                    State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+                    ops.add(new Op(Op.Type.ADD, st));
+                    ops.add(new Op(Op.Type.DELETE, st));
+                    if (log.isDebugEnabled()) {
+                        log.debug("touchChildren {}", ops);
+                    }
+                    return ops;
+                }
+            };
+            result.preOp = false;
+            result.ops = result.refresh(null);
+            return result;
+        }
+
+        WriteOps init(PerReplicaStates rs) {
+            if (rs == null) return null;
+            get(rs);
+            return this;
+        }
+
+        public List<Op> get() {
+            return ops;
+        }
+
+        public List<Op> get(PerReplicaStates rs) {
+            ops = refresh(rs);
+            if (ops == null) ops = Collections.emptyList();
+            this.rs = rs;
+            return ops;
+        }
+
+        /**
+         * To be executed before collection state.json is persisted
+         */
+        public boolean isPreOp() {
+            return preOp;
+        }
+
+        /**
+         * if a multi operation fails because the state got modified from behind,
+         * refresh the operation and try again
+         *
+         * @param prs The new state
+         */
+        abstract List<Op> refresh(PerReplicaStates prs);
+
+        @Override
+        public String toString() {
+            return ops.toString();
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+        appendStates(sb);
+        return sb.append("]}").toString();
+    }
+
+    private StringBuilder appendStates(StringBuilder sb) {
+        states.forEachEntry(new BiConsumer<String, State>() {
+            int count = 0;
+            @Override
+            public void accept(String s, State state) {
+                if(count++ > 0) sb.append(", ");
+                sb.append(state.asString);
+                for (State d : state.getDuplicates()) sb.append(d.asString);
+            }
+        });
+        return sb;
+    }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 5f3fd9b..e6c412c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -17,6 +17,7 @@
 package org.apache.solr.common.cloud;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -29,12 +30,15 @@ import java.util.function.BiPredicate;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.util.Utils;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.ConditionalMapWriter.NON_NULL_VAL;
 import static org.apache.solr.common.ConditionalMapWriter.dedupeKeyPredicate;
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 
 public class Replica extends ZkNodeProps implements MapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
   /**
    * The replica's state. In general, if the node the replica is hosted on is
@@ -53,7 +57,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * {@link ClusterState#liveNodesContain(String)}).
      * </p>
      */
-    ACTIVE,
+    ACTIVE("A"),
     
     /**
      * The first state before {@link State#RECOVERING}. A node in this state
@@ -64,13 +68,13 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * should not be relied on.
      * </p>
      */
-    DOWN,
+    DOWN("D"),
     
     /**
      * The node is recovering from the leader. This might involve peer-sync,
      * full replication or finding out things are already in sync.
      */
-    RECOVERING,
+    RECOVERING("R"),
     
     /**
      * Recovery attempts have not worked, something is not right.
@@ -80,8 +84,16 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * cluster and it's state should be discarded.
      * </p>
      */
-    RECOVERY_FAILED;
-    
+    RECOVERY_FAILED("F");
+
+    /**short name for a state. Used to encode this in the state node see {@link PerReplicaStates.State}
+     */
+    public final String shortName;
+
+    State(String c) {
+      this.shortName = c;
+    }
+
     @Override
     public String toString() {
       return super.toString().toLowerCase(Locale.ROOT);
@@ -89,7 +101,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
     
     /** Converts the state string to a State instance. */
     public static State getState(String stateStr) {
-      return stateStr == null ? null : State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+      return stateStr == null ? null : Replica.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
     }
   }
 
@@ -125,6 +137,8 @@ public class Replica extends ZkNodeProps implements MapWriter {
   public final Type type;
   public final String shard, collection;
 
+  private PerReplicaStates.State replicaState;
+
   // mutable
   private State state;
 
@@ -177,9 +191,18 @@ public class Replica extends ZkNodeProps implements MapWriter {
     this.shard = String.valueOf(details.get("shard"));
     this.core = String.valueOf(details.get("core"));
     this.node = String.valueOf(details.get("node_name"));
-    type = Replica.Type.valueOf(String.valueOf(details.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
-    state = State.getState(String.valueOf(details.getOrDefault(ZkStateReader.STATE_PROP, "active")));
+
     this.propMap.putAll(details);
+    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
+      log.debug("A replica  {} state fetched from per-replica state", name);
+      replicaState = it.getStates().get(name);
+      if(replicaState!= null) {
+        propMap.put(ZkStateReader.STATE_PROP, replicaState.state.toString().toLowerCase(Locale.ROOT));
+        if (replicaState.isLeader) propMap.put(Slice.LEADER, "true");
+      }
+    }) ;
+    type = Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
+    if(state == null) state = State.getState(String.valueOf(propMap.getOrDefault(ZkStateReader.STATE_PROP, "active")));
     validate();
     propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
   }
@@ -273,6 +296,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
   }
 
   public boolean isLeader() {
+    if(replicaState != null) return replicaState.isLeader;
     return getBool(ZkStateReader.LEADER_PROP, false);
   }
 
@@ -295,6 +319,21 @@ public class Replica extends ZkNodeProps implements MapWriter {
     final String propertyValue = getStr(propertyKey);
     return propertyValue;
   }
+  public Replica copyWith(PerReplicaStates.State state) {
+    log.debug("A replica is updated with new state : {}", state);
+    Map<String, Object> props = new LinkedHashMap<>(propMap);
+    if (state == null) {
+      props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+      props.remove(Slice.LEADER);
+    } else {
+      props.put(ZkStateReader.STATE_PROP, state.state.toString());
+      if (state.isLeader) props.put(Slice.LEADER, "true");
+    }
+    Replica r = new Replica(name, props, collection, shard);
+    r.replicaState = state;
+    return r;
+  }
+
 
   public Object clone() {
     return new Replica(name, node, collection, shard, core, state, type, propMap);
@@ -305,6 +344,17 @@ public class Replica extends ZkNodeProps implements MapWriter {
     ew.put(name, _allPropsWriter());
   }
 
+  private static final Map<String, State> STATES = new HashMap<>();
+  static {
+    STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
+    STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
+    STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
+    STATES.put(Replica.State.RECOVERY_FAILED.shortName, Replica.State.RECOVERY_FAILED);
+  }
+  public static State getState(String  c) {
+    return STATES.get(c);
+  }
+
 
   private MapWriter _allPropsWriter() {
     BiPredicate<CharSequence, Object> p = dedupeKeyPredicate(new HashSet<>())
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 4378ef7..23796fb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.common.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -25,11 +26,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.solr.common.cloud.Replica.Type;
 import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.util.Utils.toJSONString;
 
@@ -37,6 +41,8 @@ import static org.apache.solr.common.util.Utils.toJSONString;
  * A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
  */
 public class Slice extends ZkNodeProps implements Iterable<Replica> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   public final String collection;
 
   /** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
@@ -61,6 +67,16 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values().iterator();
   }
 
+  /**Make a copy with a modified replica
+   */
+  public Slice copyWith(Replica modified) {
+    if(log.isDebugEnabled()) {
+      log.debug("modified replica : {}", modified);
+    }
+    Map<String, Replica> replicasCopy = new LinkedHashMap<>(replicas);
+    replicasCopy.put(modified.getName(), modified);
+    return new Slice(name, replicasCopy, propMap, collection);
+  }
   /** The slice's state. */
   public enum State {
 
@@ -210,7 +226,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   private Replica findLeader() {
     for (Replica replica : replicas.values()) {
-      if (replica.getStr(LEADER) != null) {
+      if (replica.isLeader()) {
         assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT: "Pull replica should not become leader!";
         return replica;
       }
@@ -235,6 +251,10 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     return replicas.values();
   }
 
+  public Set<String> getReplicaNames() {
+    return Collections.unmodifiableSet(replicas.keySet());
+  }
+
   /**
    * Gets all replicas that match a predicate
    */
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1ac21fe..7a12c92 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -331,6 +331,18 @@ public class SolrZkClient implements Closeable {
   }
 
   /**
+   * Returns children of the node at the path
+   */
+  public List<String> getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
+    } else {
+      return keeper.getChildren(path, wrapWatcher(watcher), stat);
+    }
+  }
+
+  /**
    * Returns node's data
    */
   public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7ff69cf..118dc05 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -23,11 +23,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -655,7 +657,7 @@ public class ZkStateReader implements SolrCloseable {
 
   private class LazyCollectionRef extends ClusterState.CollectionRef {
     private final String collName;
-    private long lastUpdateTime;
+    private volatile long lastUpdateTime;
     private DocCollection cachedDocCollection;
 
     public LazyCollectionRef(String collName) {
@@ -675,7 +677,7 @@ public class ZkStateReader implements SolrCloseable {
             exists = zkClient.exists(getCollectionPath(collName), null, true);
           } catch (Exception e) {
           }
-          if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+          if (exists != null && !cachedDocCollection.isModified(exists.getVersion(), exists.getCversion())) {
             shouldFetch = false;
           }
         }
@@ -1195,9 +1197,11 @@ public class ZkStateReader implements SolrCloseable {
    */
   class StateWatcher implements Watcher {
     private final String coll;
+    private final String collectionPath;
 
     StateWatcher(String coll) {
       this.coll = coll;
+      collectionPath = getCollectionPath(coll);
     }
 
     @Override
@@ -1216,20 +1220,32 @@ public class ZkStateReader implements SolrCloseable {
       Set<String> liveNodes = ZkStateReader.this.liveNodes;
       if (log.isInfoEnabled()) {
         log.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
-            event, coll, liveNodes.size());
+                event, coll, liveNodes.size());
       }
 
-      refreshAndWatch();
+      refreshAndWatch(event.getType());
 
     }
 
+    public void refreshAndWatch() {
+      refreshAndWatch(null);
+    }
+
     /**
      * Refresh collection state from ZK and leave a watch for future changes.
      * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
      * with the results of the refresh.
      */
-    public void refreshAndWatch() {
+    public void refreshAndWatch(EventType eventType) {
       try {
+        if (eventType == null || eventType == EventType.NodeChildrenChanged) {
+          refreshAndWatchChildren();
+          if (eventType == EventType.NodeChildrenChanged) {
+            //only per-replica states modified. return
+            return;
+          }
+        }
+
         DocCollection newState = fetchCollectionState(coll, this);
         updateWatchedCollection(coll, newState);
         synchronized (getUpdateLock()) {
@@ -1246,6 +1262,32 @@ public class ZkStateReader implements SolrCloseable {
         log.error("Unwatched collection: [{}]", coll, e);
       }
     }
+
+    private void refreshAndWatchChildren() throws KeeperException, InterruptedException {
+      Stat stat = new Stat();
+      List<String> replicaStates = null;
+      try {
+        replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
+        PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
+        DocCollection oldState = watchedCollectionStates.get(coll);
+        DocCollection newState = null;
+        if (oldState != null) {
+          newState = oldState.copyWith(newStates);
+        } else {
+          newState = fetchCollectionState(coll, null);
+        }
+        updateWatchedCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+        if (log.isDebugEnabled()) {
+          log.debug("updated per-replica states changed for: {}, ver: {} , new vals: {}",  coll, stat.getCversion(), replicaStates);
+        }
+
+      } catch (NoNodeException e) {
+        log.info("{} is deleted, stop watching children", collectionPath);
+      }
+    }
   }
 
   /**
@@ -1419,9 +1461,19 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
-    String collectionPath = getCollectionPath(coll);
+  public DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
+    String collectionPath  = getCollectionPath(coll);
     while (true) {
+      ClusterState.initReplicaStateProvider(() -> {
+        try {
+          PerReplicaStates replicaStates = getReplicaStates(new PerReplicaStates(collectionPath, 0, Collections.emptyList()));
+          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+          return replicaStates;
+        } catch (Exception e) {
+          //TODO
+          throw new RuntimeException(e);
+        }
+      });
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1439,6 +1491,8 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         return null;
+      } finally {
+        ClusterState.clearReplicaStateProvider();
       }
     }
   }
@@ -1566,11 +1620,26 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
+    state = updatePerReplicaState(state);
     if (stateWatcher.onStateChanged(state) == true) {
       removeDocCollectionWatcher(collection, stateWatcher);
     }
   }
 
+  private DocCollection updatePerReplicaState(DocCollection c) {
+    if (c == null || !c.isPerReplicaState()) return c;
+    PerReplicaStates current = c.getPerReplicaStates();
+    PerReplicaStates newPrs = PerReplicaStates.fetch(c.getZNode(), zkClient, current);
+    if (newPrs != current) {
+      log.debug("just-in-time update for a fresh per-replica-state {}", c.getName());
+      DocCollection modifiedColl = c.copyWith(newPrs);
+      updateWatchedCollection(c.getName(), modifiedColl);
+      return modifiedColl;
+    } else {
+      return c;
+    }
+  }
+
   /**
    * Block until a CollectionStatePredicate returns true, or the wait times out
    *
@@ -1804,7 +1873,9 @@ public class ZkStateReader implements SolrCloseable {
           break;
         }
       } else {
-        if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+        int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
+        int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
+        if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) {
           // no change to state, but we might have been triggered by the addition of a
           // state watcher, so run notifications
           updated = true;
@@ -2198,7 +2269,17 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
+  public PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
+    return PerReplicaStates.fetch(path, zkClient, null);
+
+  }
+
+  public PerReplicaStates getReplicaStates(PerReplicaStates current) throws KeeperException, InterruptedException {
+    return PerReplicaStates.fetch(current.path, zkClient, current);
+  }
+
   public DocCollection getCollection(String collection) {
-    return clusterState.getCollectionOrNull(collection);
+    return clusterState == null ? null : clusterState.getCollectionOrNull(collection);
   }
+
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
new file mode 100644
index 0000000..25a095d
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.zookeeper.CreateMode;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestPerReplicaStates extends SolrCloudTestCase {
+    @Before
+    public void prepareCluster() throws Exception {
+        configureCluster(4)
+                .configure();
+    }
+
+    @After
+    public void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    public void testBasic() {
+        PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+        assertEquals("R1:1:A", rs.asString);
+
+        rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+        assertEquals("R1:1:D:L", rs.asString);
+        rs = PerReplicaStates.State.parse (rs.asString);
+        assertEquals(State.DOWN, rs.state);
+
+    }
+
+    public void testEntries() {
+        PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+        assertEquals(2, entries.get("R1").version);
+        entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+        assertEquals(2, entries.get("R1").version);
+        assertEquals(2, entries.get("R1").getDuplicates().size());
+        Set<String> modified = PerReplicaStates.findModifiedReplicas(entries,  new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+        assertEquals(1, modified.size());
+        assertTrue(modified.contains("R3"));
+        modified = PerReplicaStates.findModifiedReplicas( entries,
+                new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+        assertEquals(2, modified.size());
+        assertTrue(modified.contains("R3"));
+        assertTrue(modified.contains("R4"));
+        modified = PerReplicaStates.findModifiedReplicas( entries,
+                new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+        assertEquals(3, modified.size());
+        assertTrue(modified.contains("R3"));
+        assertTrue(modified.contains("R4"));
+        assertTrue(modified.contains("R2"));
+
+
+    }
+
+    public void testReplicaStateOperations() throws Exception {
+        String root = "/testReplicaStateOperations";
+        cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+        ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+        for (String state : states) {
+            cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
+        }
+
+        ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+        PerReplicaStates rs = zkStateReader.getReplicaStates(new PerReplicaStates(root, 0, Collections.emptyList()));
+        assertEquals(3, rs.states.size());
+        assertTrue(rs.cversion >= 5);
+
+        List<PerReplicaStates.Op> ops = PerReplicaStates.WriteOps.addReplica("R5",State.ACTIVE, false, rs).get();
+
+        assertEquals(1, ops.size());
+        assertEquals(PerReplicaStates.Op.Type.ADD ,ops.get(0).typ );
+        PerReplicaStates.persist(ops, root,cluster.getZkClient());
+        rs = zkStateReader.getReplicaStates(root);
+        assertEquals(4, rs.states.size());
+        assertTrue(rs.cversion >= 6);
+        assertEquals(6,  cluster.getZkClient().getChildren(root, null,true).size());
+        ops =  PerReplicaStates.WriteOps.flipState("R1", State.DOWN , rs).get();
+
+        assertEquals(4, ops.size());
+        assertEquals(PerReplicaStates.Op.Type.ADD,  ops.get(0).typ);
+        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(1).typ);
+        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(2).typ);
+        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(3).typ);
+        PerReplicaStates.persist(ops, root,cluster.getZkClient());
+        rs = zkStateReader.getReplicaStates(root);
+        assertEquals(4, rs.states.size());
+        assertEquals(3, rs.states.get("R1").version);
+
+        ops =  PerReplicaStates.WriteOps.deleteReplica("R5" , rs).get();
+        assertEquals(1, ops.size());
+        PerReplicaStates.persist(ops, root,cluster.getZkClient());
+
+        rs = zkStateReader.getReplicaStates(root);
+        assertEquals(3, rs.states.size());
+
+        ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs).get();
+
+        assertEquals(2, ops.size());
+        assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
+        assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
+        PerReplicaStates.persist(ops, root,cluster.getZkClient());
+        rs = zkStateReader.getReplicaStates(root);
+        ops =  PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs).get();
+        assertEquals(4, ops.size());
+        PerReplicaStates.persist(ops, root,cluster.getZkClient());
+        rs = zkStateReader.getReplicaStates(root);
+        assertTrue(rs.get("R3").isLeader);
+    }
+
+}