You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/01/17 01:00:51 UTC

[lucene-solr] branch jira/solr_13951 updated: merging with 8x

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr_13951
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr_13951 by this push:
     new 290ea6a  merging with 8x
290ea6a is described below

commit 290ea6a14a74eff9e9d1dd3bca1bf8188d9cf9b3
Author: noblepaul <no...@gmail.com>
AuthorDate: Sun Jan 17 12:00:23 2021 +1100

    merging with 8x
---
 .../java/org/apache/solr/cloud/ZkController.java   |  29 +-
 .../OverseerCollectionMessageHandler.java          |   1 +
 .../apache/solr/cloud/overseer/NodeMutator.java    |   6 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |   8 +-
 .../apache/solr/cloud/overseer/SliceMutator.java   |  25 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |   6 +-
 .../apache/solr/cloud/overseer/ZkWriteCommand.java |   8 +-
 solr/solr-ref-guide/src/collection-management.adoc |   3 +
 .../apache/solr/common/cloud/PerReplicaStates.java | 705 +++++++--------------
 .../solr/common/cloud/PerReplicaStatesOps.java     | 303 +++++++++
 .../apache/solr/common/cloud/ZkStateReader.java    |  10 +-
 .../resources/apispec/collections.Commands.json    |   5 +
 .../client/solrj/impl/CloudSolrClientTest.java     |  20 +-
 .../solr/common/cloud/TestPerReplicaStates.java    | 200 +++---
 14 files changed, 707 insertions(+), 622 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 28fcf73..ee4da55 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1589,12 +1589,39 @@ public class ZkController implements Closeable {
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      overseerJobQueue.offer(Utils.toJSON(m));
+      DocCollection coll = zkStateReader.getCollection(collection);
+      if (forcePublish || sendToOverseer(coll, coreNodeName)) {
+        overseerJobQueue.offer(Utils.toJSON(m));
+      } else {
+        if (log.isDebugEnabled()) {
+          log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
+        }
+        PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+        PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
+            .persist(coll.getZNode(), zkClient);
+      }
     } finally {
       MDCLoggingContext.clear();
     }
   }
 
+  /**
+   * Whether a message needs to be sent to overseer or not
+   */
+  static boolean sendToOverseer(DocCollection coll, String replicaName) {
+    if (coll == null) return true;
+    Replica r = coll.getReplica(replicaName);
+    if (r == null) return true;
+    Slice shard = coll.getSlice(r.shard);
+    if (shard == null) return true;//very unlikely
+    if (shard.getState() == Slice.State.RECOVERY) return true;
+    if (shard.getParent() != null) return true;
+    for (Slice slice : coll.getSlices()) {
+      if (Objects.equals(shard.getName(), slice.getParent())) return true;
+    }
+    return false;
+  }
+
   public ZkShardTerms getShardTerms(String collection, String shardId) {
     return getCollectionTerms(collection).getShard(shardId);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 4150842..f56a9f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -142,6 +142,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       ZkStateReader.REPLICATION_FACTOR, "1",
       ZkStateReader.NRT_REPLICAS, "1",
       ZkStateReader.TLOG_REPLICAS, "0",
+      DocCollection.PER_REPLICA_STATE, null,
       ZkStateReader.PULL_REPLICAS, "0"));
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 77f0550..e8db2b4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
 
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -80,9 +80,9 @@ public class NodeMutator {
       }
 
       if (needToUpdateCollection) {
-        if(docCollection.isPerReplicaState()) {
+        if (docCollection.isPerReplicaState()) {
           zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
-              PerReplicaStates.WriteOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+              PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
         } else {
           zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 747cf90..ecf06e4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -39,6 +39,7 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Slice;
@@ -309,9 +310,8 @@ public class ReplicaMutator {
     Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
-    Replica oldReplica = null;
     if (slice != null) {
-      oldReplica = slice.getReplica(coreNodeName);
+      Replica oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -376,9 +376,9 @@ public class ReplicaMutator {
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
     log.debug("Collection is now: {}", newCollection);
-    if(collection != null && collection.isPerReplicaState()) {
+    if (collection != null && collection.isPerReplicaState()) {
       PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
-      return new ZkWriteCommand(collectionName, newCollection, PerReplicaStates.WriteOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+      return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
     } else{
       return new ZkWriteCommand(collectionName, newCollection);
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 0d95993..019339e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -31,6 +31,7 @@ import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.RoutingRule;
@@ -95,13 +96,14 @@ public class SliceMutator {
             ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
             ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
 
-    if(collection.isPerReplicaState()) {
+    if (collection.isPerReplicaState()) {
       PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
       return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
-              PerReplicaStates.WriteOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+          PerReplicaStatesOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
     } else {
       return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
     }
+
   }
 
   public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -127,7 +129,12 @@ public class SliceMutator {
       newSlices.put(slice.getName(), slice);
     }
 
-    return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+
+    if (coll.isPerReplicaState()) {
+      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices), PerReplicaStatesOps.deleteReplica(cnn, coll.getPerReplicaStates()) , true);
+    } else {
+      return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+    }
   }
 
   public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
@@ -154,7 +161,7 @@ public class SliceMutator {
       if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
         replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
       } else if (coreURL.equals(leaderUrl)) {
-        newLeader= replica = new ReplicaMutator(cloudManager).setLeader(replica);
+        newLeader = replica = new ReplicaMutator(cloudManager).setLeader(replica);
       }
 
       newReplicas.put(replica.getName(), replica);
@@ -163,13 +170,13 @@ public class SliceMutator {
     Map<String, Object> newSliceProps = slice.shallowCopy();
     newSliceProps.put(Slice.REPLICAS, newReplicas);
     slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
-    if(coll.isPerReplicaState()) {
+    if (coll.isPerReplicaState()) {
       PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
       return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
-              PerReplicaStates.WriteOps.flipLeader(
-                      slice.getReplicaNames(),
-                      newLeader == null ? null : newLeader.getName(),
-                      prs), false);
+          PerReplicaStatesOps.flipLeader(
+              slice.getReplicaNames(),
+              newLeader == null ? null : newLeader.getName(),
+              prs), false);
     } else {
       return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 26e100f..1ebccd2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -221,11 +221,11 @@ public class ZkStateWriter {
           DocCollection c = cmd.collection;
 
           if(cmd.ops != null && cmd.ops.isPreOp()) {
-            PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+            cmd.ops.persist(path, reader.getZkClient());
             clusterState = clusterState.copyWith(name,
                   cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
           }
-          if(!cmd.persistCollState) continue;
+          if (!cmd.persistCollState) continue;
           if (c == null) {
             // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
             log.debug("going to delete state.json {}", path);
@@ -247,7 +247,7 @@ public class ZkStateWriter {
             }
           }
             if(cmd.ops != null && !cmd.ops.isPreOp()) {
-              PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+              cmd.ops.persist(path, reader.getZkClient());
               DocCollection currentCollState = clusterState.getCollection(cmd.name);
               if ( currentCollState != null) {
                 clusterState = clusterState.copyWith(name,
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index 2f71674..39d953c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,7 +17,7 @@
 package org.apache.solr.cloud.overseer;
 
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
 
 public class ZkWriteCommand {
 
@@ -27,9 +27,9 @@ public class ZkWriteCommand {
   public final boolean noop;
   // persist the collection state. If this is false, it means the collection state is not modified
   public final boolean persistCollState;
-  public final PerReplicaStates.WriteOps ops;
+  public final PerReplicaStatesOps ops;
 
-  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStates.WriteOps replicaOps, boolean persistCollState) {
+  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
     boolean isPerReplicaState = collection.isPerReplicaState();
     this.name = name;
     this.collection = collection;
@@ -43,7 +43,7 @@ public class ZkWriteCommand {
     this.noop = false;
     persistCollState = true;
     this.ops = collection != null && collection.isPerReplicaState() ?
-        PerReplicaStates.WriteOps.touchChildren():
+        PerReplicaStatesOps.touchChildren():
         null;
   }
 
diff --git a/solr/solr-ref-guide/src/collection-management.adoc b/solr/solr-ref-guide/src/collection-management.adoc
index 14e32b6..e1ee0b4 100644
--- a/solr/solr-ref-guide/src/collection-management.adoc
+++ b/solr/solr-ref-guide/src/collection-management.adoc
@@ -87,6 +87,9 @@ If this parameter is specified, the router will look at the value of the field i
 +
 Please note that <<realtime-get.adoc#realtime-get,RealTime Get>> or retrieval by document ID would also require the parameter `\_route_` (or `shard.keys`) to avoid a distributed search.
 
+`perReplicaState`::
+If `true` the states of individual replicas will be maintained as individual child of the `state.json`. default is `false`
+
 `property._name_=_value_`::
 Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values.
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index 3eb2ca2..af4bb43 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -36,14 +36,11 @@ import org.apache.solr.common.annotation.JsonProperty;
 import org.apache.solr.common.util.ReflectMapWriter;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.WrappedSimpleMap;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Collections.singletonList;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.params.CommonParams.VERSION;
 
@@ -52,536 +49,264 @@ import static org.apache.solr.common.params.CommonParams.VERSION;
  * This is an immutable object. When states are modified, a new instance is constructed
  */
 public class PerReplicaStates implements ReflectMapWriter {
-    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-    public static final char SEPARATOR = ':';
-
-
-    @JsonProperty
-    public final String path;
-
-    @JsonProperty
-    public final int cversion;
-
-    @JsonProperty
-    public final SimpleMap<State> states;
-
-    public PerReplicaStates(String path, int cversion, List<String> states) {
-        this.path = path;
-        this.cversion = cversion;
-        Map<String, State> tmp = new LinkedHashMap<>();
-
-        for (String state : states) {
-            State rs = State.parse(state);
-            if (rs == null) continue;
-            State existing = tmp.get(rs.replica);
-            if (existing == null) {
-                tmp.put(rs.replica, rs);
-            } else {
-                tmp.put(rs.replica, rs.insert(existing));
-            }
-        }
-        this.states = new WrappedSimpleMap<>(tmp);
-
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  //znode path where thisis loaded from
+  @JsonProperty
+  public final String path;
+
+  // the child version of that znode
+  @JsonProperty
+  public final int cversion;
+
+  //states of individual replicas
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
     }
+    this.states = new WrappedSimpleMap<>(tmp);
 
-    /**Get the changed replicas
-     */
-    public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
-        Set<String> result = new HashSet<>();
-        if(fresh == null) {
-            old.states.forEachKey(result::add);
-            return result;
-        }
-        old.states.forEachEntry((s, state) -> {
-            //the state is modified or missing
-            if(!Objects.equals(fresh.get(s) , state)) result.add(s);
-        });
-        fresh.states.forEachEntry((s, state) -> { if(old.get(s) == null ) result.add(s);
-        });
-        return result;
-    }
+  }
 
-    /**
-     * This is a persist operation with retry if a write fails due to stale state
-     */
-    public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
-        try {
-            persist(ops.get(), znode, zkClient);
-        } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
-            //state is stale
-            log.info("stale state for {} . retrying...", znode);
-            List<Op> freshOps = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
-            persist(freshOps, znode, zkClient);
-            log.info("retried for stale state {}, succeeded", znode);
-        }
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
     }
-
-    /**
-     * Persist a set of operations to Zookeeper
-     */
-    public static void persist(List<Op> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
-        if (operations == null || operations.isEmpty()) return;
-        log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
-
-        List<org.apache.zookeeper.Op> ops = new ArrayList<>(operations.size());
-        for (Op op : operations) {
-            //the state of the replica is being updated
-            String path = znode + "/" + op.state.asString;
-            List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
-            ops.add(op.typ == Op.Type.ADD ?
-                    org.apache.zookeeper.Op.create(path, null, acls, CreateMode.PERSISTENT) :
-                    org.apache.zookeeper.Op.delete(path, -1));
-        }
-        try {
-            zkClient.multi(ops, true);
-            if (log.isDebugEnabled()) {
-                //nocommit
-                try {
-                    Stat stat = zkClient.exists(znode, null, true);
-                    log.debug("After update, cversion : {}", stat.getCversion());
-                } catch (Exception e) {
-                }
-
-            }
-        } catch (KeeperException e) {
-            log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
-            throw e;
-        }
-
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
     }
+  }
 
 
-    /**
-     * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
-     * If this is not modified, the same object is returned
-     */
-    public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
-        try {
-            if (current != null) {
-                Stat stat = zkClient.exists(current.path, null, true);
-                if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
-                if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
-            }
-            Stat stat = new Stat();
-            List<String> children = zkClient.getChildren(path, null, stat, true);
-            return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
-        } catch (KeeperException e) {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
-        } catch (InterruptedException e) {
-            SolrZkClient.checkInterrupted(e);
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
-        }
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
     }
+    return null;
+  }
 
+  public State get(String replica) {
+    return states.get(replica);
+  }
 
-    private static List<Op> addDeleteStaleNodes(List<Op> ops, State rs) {
-        while (rs != null) {
-            ops.add(new Op(Op.Type.DELETE, rs));
-            rs = rs.duplicate;
-        }
-        return ops;
-    }
-
-    public static String getReplicaName(String s) {
-        int idx = s.indexOf(SEPARATOR);
-        if (idx > 0) {
-            return s.substring(0, idx);
-        }
-        return null;
-    }
+  public static class Operation {
+    public final Type typ;
+    public final State state;
 
-    public State get(String replica) {
-        return states.get(replica);
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
     }
 
-    public static class Op {
-        public final Type typ;
-        public final State state;
-
-        public Op(Type typ, State replicaState) {
-            this.typ = typ;
-            this.state = replicaState;
-        }
-
-
-        public enum Type {
-            //add a new node
-            ADD,
-            //delete an existing node
-            DELETE
-        }
 
-        @Override
-        public String toString() {
-            return typ.toString() + " : " + state;
-        }
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
     }
 
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
 
-    /**
-     * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
-     */
-    public static class State implements MapWriter {
-
-        public final String replica;
-
-        public final Replica.State state;
-
-        public final Boolean isLeader;
-
-        public final int version;
-
-        public final String asString;
-
-        /**
-         * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
-         * <p>
-         * the entry with '13' is the latest and the one with '12' is considered a duplicate
-         * <p>
-         * These are unlikely, but possible
-         */
-        final State duplicate;
-
-        private State(String serialized, List<String> pieces) {
-            this.asString = serialized;
-            replica = pieces.get(0);
-            version = Integer.parseInt(pieces.get(1));
-            String encodedStatus = pieces.get(2);
-            this.state = Replica.getState(encodedStatus);
-            isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
-            duplicate = null;
-        }
-
-        public static State parse(String serialized) {
-            List<String> pieces = StrUtils.splitSmart(serialized, ':');
-            if (pieces.size() < 3) return null;
-            return new State(serialized, pieces);
-
-        }
-
-        public State(String replica, Replica.State state, Boolean isLeader, int version) {
-            this(replica, state, isLeader, version, null);
-        }
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+    appendStates(sb);
+    return sb.append("]}").toString();
+  }
 
-        public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
-            this.replica = replica;
-            this.state = state == null ? Replica.State.ACTIVE : state;
-            this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
-            this.version = version;
-            asString = serialize();
-            this.duplicate = duplicate;
-        }
+  private StringBuilder appendStates(StringBuilder sb) {
+    states.forEachEntry(new BiConsumer<String, State>() {
+      int count = 0;
 
-        @Override
-        public void writeMap(EntryWriter ew) throws IOException {
-            ew.put(NAME, replica);
-            ew.put(VERSION, version);
-            ew.put(ZkStateReader.STATE_PROP, state.toString());
-            if (isLeader) ew.put(Slice.LEADER, isLeader);
-            ew.putIfNotNull("duplicate", duplicate);
-        }
+      @Override
+      public void accept(String s, State state) {
+        if (count++ > 0) sb.append(", ");
+        sb.append(state.asString);
+        for (State d : state.getDuplicates()) sb.append(d.asString);
+      }
+    });
+    return sb;
+  }
 
-        private State insert(State duplicate) {
-            assert this.replica.equals(duplicate.replica);
-            if (this.version >= duplicate.version) {
-                if (this.duplicate != null) {
-                    duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
-                }
-                return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
-            } else {
-                return duplicate.insert(this);
-            }
-        }
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
 
-        /**
-         * fetch duplicates entries for this replica
-         */
-        List<State> getDuplicates() {
-            if (duplicate == null) return Collections.emptyList();
-            List<State> result = new ArrayList<>();
-            State current = duplicate;
-            while (current != null) {
-                result.add(current);
-                current = current.duplicate;
-            }
-            return result;
-        }
+    public final String replica;
 
-        private String serialize() {
-            StringBuilder sb = new StringBuilder(replica)
-                    .append(":")
-                    .append(version)
-                    .append(":")
-                    .append(state.shortName);
-            if (isLeader) sb.append(":").append("L");
-            return sb.toString();
-        }
+    public final Replica.State state;
 
+    public final Boolean isLeader;
 
-        @Override
-        public String toString() {
-            return asString;
-        }
+    public final int version;
 
-        @Override
-        public boolean equals(Object o) {
-            if (o instanceof State) {
-                State that = (State) o;
-                return Objects.equals(this.asString, that.asString);
-            }
-            return false;
-        }
+    public final String asString;
 
-        @Override
-        public int hashCode() {
-            return asString.hashCode();
-        }
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;
+
+    private State(String serialized, List<String> pieces) {
+      this.asString = serialized;
+      replica = pieces.get(0);
+      version = Integer.parseInt(pieces.get(1));
+      String encodedStatus = pieces.get(2);
+      this.state = Replica.getState(encodedStatus);
+      isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+      duplicate = null;
     }
 
+    public static State parse(String serialized) {
+      List<String> pieces = StrUtils.splitSmart(serialized, ':');
+      if (pieces.size() < 3) return null;
+      return new State(serialized, pieces);
 
-    public static abstract class WriteOps {
-        private PerReplicaStates rs;
-        List<Op> ops;
-        private boolean preOp = true;
-
-        /**
-         * state of a replica is changed
-         *
-         * @param newState the new state
-         */
-        public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
-            return new WriteOps() {
-                @Override
-                protected List<Op> refresh(PerReplicaStates rs) {
-                    List<Op> ops = new ArrayList<>(2);
-                    State existing = rs.get(replica);
-                    if (existing == null) {
-                        ops.add(new Op(Op.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
-                    } else {
-                        ops.add(new Op(Op.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
-                        addDeleteStaleNodes(ops, existing);
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
-                    }
-                    return ops;
-                }
-            }.init(rs);
-        }
-
-        public PerReplicaStates getPerReplicaStates() {
-            return rs;
-        }
-
-
-        /**Switch a collection from/to perReplicaState=true
-         */
-        public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
-            return new WriteOps() {
-                @Override
-                List<Op> refresh(PerReplicaStates prs) {
-                    return enable ? enable(coll) : disable(prs);
-                }
-
-                List<Op> enable(DocCollection coll) {
-                    List<Op> result = new ArrayList<>();
-                    coll.forEachReplica((s, r) -> result.add(new Op(Op.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
-                    return result;
-                }
-
-                List<Op> disable(PerReplicaStates prs) {
-                    List<Op> result = new ArrayList<>();
-                    prs.states.forEachEntry((s, state) -> result.add(new Op(Op.Type.DELETE, state)));
-                    return result;
-                }
-            }.init(prs);
-
-        }
-
-        /**
-         * Flip the leader replica to a new one
-         *
-         * @param allReplicas  allReplicas of the shard
-         * @param next next leader
-         */
-        public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
-            return new WriteOps() {
-
-                @Override
-                protected List<Op> refresh(PerReplicaStates rs) {
-                    List<Op> ops = new ArrayList<>(4);
-                    if(next != null) {
-                        State st = rs.get(next);
-                        if (st != null) {
-                            if (!st.isLeader) {
-                                ops.add(new Op(Op.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
-                                ops.add(new Op(Op.Type.DELETE, st));
-                            }
-                            //else do not do anything , that node is the leader
-                        } else {
-                            //there is no entry for the new leader.
-                            //create one
-                            ops.add(new Op(Op.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
-                        }
-                    }
-
-                    //now go through all other replicas and unset previous leader
-                    for (String r : allReplicas) {
-                        State st = rs.get(r);
-                        if (st == null) continue;//unlikely
-                        if (!Objects.equals(r, next)) {
-                            if(st.isLeader) {
-                                //some other replica is the leader now. unset
-                                ops.add(new Op(Op.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
-                                ops.add(new Op(Op.Type.DELETE, st));
-                            }
-                        }
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
-                    }
-                    return ops;
-                }
-
-            }.init(rs);
-        }
-
-        /**
-         * Delete a replica entry from per-replica states
-         *
-         * @param replica name of the replica to be deleted
-         */
-        public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
-            return new WriteOps() {
-                @Override
-                protected List<Op> refresh(PerReplicaStates rs) {
-                    List<Op> result;
-                    if (rs == null) {
-                        result = Collections.emptyList();
-                    } else {
-                        State state = rs.get(replica);
-                        result = addDeleteStaleNodes(new ArrayList<>(), state);
-                    }
-                    return result;
-                }
-            }.init(rs);
-        }
+    }
 
-        public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
-            return new WriteOps() {
-                @Override
-                protected List<Op> refresh(PerReplicaStates rs) {
-                    return singletonList(new Op(Op.Type.ADD,
-                            new State(replica, state, isLeader, 0)));
-                }
-            }.init(rs);
-        }
+    public State(String replica, Replica.State state, Boolean isLeader, int version) {
+      this(replica, state, isLeader, version, null);
+    }
 
-        /**
-         * mark a bunch of replicas as DOWN
-         */
-        public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
-            return new WriteOps() {
-                @Override
-                List<Op> refresh(PerReplicaStates rs) {
-                    List<Op> ops = new ArrayList<>();
-                    for (String replica : replicas) {
-                        State r = rs.get(replica);
-                        if (r != null) {
-                            if (r.state == Replica.State.DOWN && !r.isLeader) continue;
-                            ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
-                            addDeleteStaleNodes(ops, r);
-                        } else {
-                            ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
-                        }
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
-                    }
-                    return ops;
-                }
-            }.init(rs);
-        }
+    public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+      this.replica = replica;
+      this.state = state == null ? Replica.State.ACTIVE : state;
+      this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+      this.version = version;
+      asString = serialize();
+      this.duplicate = duplicate;
+    }
 
-        /**
-         * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
-         * is updated
-         */
-        public static WriteOps touchChildren() {
-            WriteOps result = new WriteOps() {
-                @Override
-                List<Op> refresh(PerReplicaStates rs) {
-                    List<Op> ops = new ArrayList<>();
-                    State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
-                    ops.add(new Op(Op.Type.ADD, st));
-                    ops.add(new Op(Op.Type.DELETE, st));
-                    if (log.isDebugEnabled()) {
-                        log.debug("touchChildren {}", ops);
-                    }
-                    return ops;
-                }
-            };
-            result.preOp = false;
-            result.ops = result.refresh(null);
-            return result;
-        }
+    @Override
+    public void writeMap(EntryWriter ew) throws IOException {
+      ew.put(NAME, replica);
+      ew.put(VERSION, version);
+      ew.put(ZkStateReader.STATE_PROP, state.toString());
+      if (isLeader) ew.put(Slice.LEADER, isLeader);
+      ew.putIfNotNull("duplicate", duplicate);
+    }
 
-        WriteOps init(PerReplicaStates rs) {
-            if (rs == null) return null;
-            get(rs);
-            return this;
+    private State insert(State duplicate) {
+      assert this.replica.equals(duplicate.replica);
+      if (this.version >= duplicate.version) {
+        if (this.duplicate != null) {
+          duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
         }
+        return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+      } else {
+        return duplicate.insert(this);
+      }
+    }
 
-        public List<Op> get() {
-            return ops;
-        }
+    /**
+     * fetch duplicates entries for this replica
+     */
+    List<State> getDuplicates() {
+      if (duplicate == null) return Collections.emptyList();
+      List<State> result = new ArrayList<>();
+      State current = duplicate;
+      while (current != null) {
+        result.add(current);
+        current = current.duplicate;
+      }
+      return result;
+    }
 
-        public List<Op> get(PerReplicaStates rs) {
-            ops = refresh(rs);
-            if (ops == null) ops = Collections.emptyList();
-            this.rs = rs;
-            return ops;
-        }
+    private String serialize() {
+      StringBuilder sb = new StringBuilder(replica)
+          .append(":")
+          .append(version)
+          .append(":")
+          .append(state.shortName);
+      if (isLeader) sb.append(":").append("L");
+      return sb.toString();
+    }
 
-        /**
-         * To be executed before collection state.json is persisted
-         */
-        public boolean isPreOp() {
-            return preOp;
-        }
 
-        /**
-         * if a multi operation fails because the state got modified from behind,
-         * refresh the operation and try again
-         *
-         * @param prs The new state
-         */
-        abstract List<Op> refresh(PerReplicaStates prs);
-
-        @Override
-        public String toString() {
-            return ops.toString();
-        }
+    @Override
+    public String toString() {
+      return asString;
     }
 
     @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
-        appendStates(sb);
-        return sb.append("]}").toString();
+    public boolean equals(Object o) {
+      if (o instanceof State) {
+        State that = (State) o;
+        return Objects.equals(this.asString, that.asString);
+      }
+      return false;
     }
 
-    private StringBuilder appendStates(StringBuilder sb) {
-        states.forEachEntry(new BiConsumer<String, State>() {
-            int count = 0;
-            @Override
-            public void accept(String s, State state) {
-                if(count++ > 0) sb.append(", ");
-                sb.append(state.asString);
-                for (State d : state.getDuplicates()) sb.append(d.asString);
-            }
-        });
-        return sb;
+    @Override
+    public int hashCode() {
+      return asString.hashCode();
     }
+  }
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
new file mode 100644
index 0000000..9e440f9
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states
+ * Do not directly manipulate the per replica states as it can become difficult to debug them
+ */
+public class PerReplicaStatesOps {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private PerReplicaStates rs;
+    List<PerReplicaStates.Operation> ops;
+    private boolean preOp = true;
+    final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
+
+    PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
+        this.fun = fun;
+    }
+
+    /**
+     * Persist a set of operations to Zookeeper
+     */
+    private void persist(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+        if (operations == null || operations.isEmpty()) return;
+        if (log.isDebugEnabled()) {
+            log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+        }
+
+        List<Op> ops = new ArrayList<>(operations.size());
+        for (PerReplicaStates.Operation op : operations) {
+            //the state of the replica is being updated
+            String path = znode + "/" + op.state.asString;
+            ops.add(op.typ == PerReplicaStates.Operation.Type.ADD ?
+                    Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
+                    Op.delete(path, -1));
+        }
+        try {
+            zkClient.multi(ops, true);
+        } catch (KeeperException e) {
+            log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+            throw e;
+        }
+
+    }
+
+    /**There is a possibility that a replica may have some leftover entries . delete them too
+     */
+    private static List<PerReplicaStates.Operation> addDeleteStaleNodes(List<PerReplicaStates.Operation> ops, PerReplicaStates.State rs) {
+        while (rs != null) {
+            ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, rs));
+            rs = rs.duplicate;
+        }
+        return ops;
+    }
+
+    /**
+     * This is a persist operation with retry if a write fails due to stale state
+     */
+    public void persist(String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+        List<PerReplicaStates.Operation> operations = ops;
+        for (int i = 0; i < PerReplicaStates.MAX_RETRIES; i++) {
+            try {
+                persist(operations, znode, zkClient);
+                return;
+            } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
+                //state is stale
+                if(log.isInfoEnabled()) {
+                    log.info("stale state for {} , attempt: {}. retrying...", znode, i);
+                }
+                operations = refresh(PerReplicaStates.fetch(znode, zkClient, null));
+            }
+        }
+    }
+
+    public PerReplicaStates getPerReplicaStates() {
+        return rs;
+    }
+
+    /**
+     * state of a replica is changed
+     *
+     * @param newState the new state
+     */
+    public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(prs -> {
+            List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
+            PerReplicaStates.State existing = rs.get(replica);
+            if (existing == null) {
+                operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
+            } else {
+                operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, existing.isLeader, existing.version + 1)));
+                addDeleteStaleNodes(operations, existing);
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+            }
+            return operations;
+        }).init(rs);
+    }
+
+    /**
+     * Switch a collection from/to perReplicaState=true
+     */
+    public static PerReplicaStatesOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(prs -> enable ? enable(coll) : disable(prs)).init(rs);
+
+    }
+
+    private static List<PerReplicaStates.Operation> enable(DocCollection coll) {
+        List<PerReplicaStates.Operation> result = new ArrayList<>();
+        coll.forEachReplica((s, r) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(r.getName(), r.getState(), r.isLeader(), 0))));
+        return result;
+    }
+
+    private static List<PerReplicaStates.Operation> disable(PerReplicaStates prs) {
+        List<PerReplicaStates.Operation> result = new ArrayList<>();
+        prs.states.forEachEntry((s, state) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, state)));
+        return result;
+    }
+
+    /**
+     * Flip the leader replica to a new one
+     *
+     * @param allReplicas allReplicas of the shard
+     * @param next        next leader
+     */
+    public static PerReplicaStatesOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(prs -> {
+            List<PerReplicaStates.Operation> ops = new ArrayList<>();
+            if (next != null) {
+                PerReplicaStates.State st = rs.get(next);
+                if (st != null) {
+                    if (!st.isLeader) {
+                        ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+                        ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+                    }
+                    //else do not do anything , that node is the leader
+                } else {
+                    //there is no entry for the new leader.
+                    //create one
+                    ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+                }
+            }
+
+            //now go through all other replicas and unset previous leader
+            for (String r : allReplicas) {
+                PerReplicaStates.State st = rs.get(r);
+                if (st == null) continue;//unlikely
+                if (!Objects.equals(r, next)) {
+                    if (st.isLeader) {
+                        //some other replica is the leader now. unset
+                        ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+                        ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+                    }
+                }
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+            }
+            return ops;
+        }).init(rs);
+    }
+
+    /**
+     * Delete a replica entry from per-replica states
+     *
+     * @param replica name of the replica to be deleted
+     */
+    public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(prs -> {
+            List<PerReplicaStates.Operation> result;
+            if (rs == null) {
+                result = Collections.emptyList();
+            } else {
+                PerReplicaStates.State state = rs.get(replica);
+                result = addDeleteStaleNodes(new ArrayList<>(), state);
+            }
+            return result;
+        }).init(rs);
+    }
+
+    public static PerReplicaStatesOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(perReplicaStates -> singletonList(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD,
+                new PerReplicaStates.State(replica, state, isLeader, 0)))).init(rs);
+    }
+
+    /**
+     * mark a bunch of replicas as DOWN
+     */
+    public static PerReplicaStatesOps downReplicas(List<String> replicas, PerReplicaStates rs) {
+        return new PerReplicaStatesOps(prs -> {
+            List<PerReplicaStates.Operation> operations = new ArrayList<>();
+            for (String replica : replicas) {
+                PerReplicaStates.State r = rs.get(replica);
+                if (r != null) {
+                    if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+                    operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+                    addDeleteStaleNodes(operations, r);
+                } else {
+                    operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
+                }
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+            }
+            return operations;
+        }).init(rs);
+    }
+
+    /**
+     * Just creates and deletes a dummy entry so that the {@link Stat#getCversion()} of states.json
+     * is updated
+     */
+    public static PerReplicaStatesOps touchChildren() {
+        PerReplicaStatesOps result = new PerReplicaStatesOps(prs -> {
+            List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
+            PerReplicaStates.State st = new PerReplicaStates.State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+            operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, st));
+            operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+            if (log.isDebugEnabled()) {
+                log.debug("touchChildren {}", operations);
+            }
+            return operations;
+        });
+        result.preOp = false;
+        result.ops = result.refresh(null);
+        return result;
+    }
+
+    PerReplicaStatesOps init(PerReplicaStates rs) {
+        if (rs == null) return null;
+        get(rs);
+        return this;
+    }
+
+    public List<PerReplicaStates.Operation> get() {
+        return ops;
+    }
+
+    public List<PerReplicaStates.Operation> get(PerReplicaStates rs) {
+        ops = refresh(rs);
+        if (ops == null) ops = Collections.emptyList();
+        this.rs = rs;
+        return ops;
+    }
+
+    /**
+     * To be executed before collection state.json is persisted
+     */
+    public boolean isPreOp() {
+        return preOp;
+    }
+
+    /**
+     * This method should compute the set of ZK operations for a given action
+     * for instance, a state change may result in 2 operations on per-replica states (1 CREATE and 1 DELETE)
+     * if a multi operation fails because the state got modified from behind,
+     * refresh the operation and try again
+     *
+     * @param prs The latest state
+     */
+    List<PerReplicaStates.Operation> refresh(PerReplicaStates prs) {
+        if (fun != null) return fun.apply(prs);
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return ops.toString();
+    }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 118dc05..9960e34 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -672,12 +672,12 @@ public class ZkStateReader implements SolrCloseable {
       if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
         boolean shouldFetch = true;
         if (cachedDocCollection != null) {
-          Stat exists = null;
+          Stat freshStats = null;
           try {
-            exists = zkClient.exists(getCollectionPath(collName), null, true);
+            freshStats = zkClient.exists(getCollectionPath(collName), null, true);
           } catch (Exception e) {
           }
-          if (exists != null && !cachedDocCollection.isModified(exists.getVersion(), exists.getCversion())) {
+          if (freshStats != null && !cachedDocCollection.isModified(freshStats.getVersion(), freshStats.getCversion())) {
             shouldFetch = false;
           }
         }
@@ -855,14 +855,16 @@ public class ZkStateReader implements SolrCloseable {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
-
+    AtomicReference<DocCollection> coll = new AtomicReference<>();
     AtomicReference<Replica> leader = new AtomicReference<>();
     try {
       waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
         if (c == null)
           return false;
+        coll.set(c);
         Replica l = getLeader(n, c, shard);
         if (l != null) {
+          log.debug("leader found for {}/{} to be {}", collection, shard, l);
           leader.set(l);
           return true;
         }
diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json
index c3abc60..747cc8b 100644
--- a/solr/solrj/src/resources/apispec/collections.Commands.json
+++ b/solr/solrj/src/resources/apispec/collections.Commands.json
@@ -90,6 +90,11 @@
           "type": "boolean",
           "description": "If true then request will complete only when all affected replicas become active.",
           "default": false
+        },
+        "perReplicaState": {
+          "type": "boolean",
+          "description": "Use Per replica states",
+          "default": false
         }
       },
       "required": [
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 6d6afc6..f655b21 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
@@ -83,6 +84,8 @@ import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
 
 /**
  * This test would be faster if we simulated the zk state instead.
@@ -239,7 +242,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
   @Test
   public void testRouting() throws Exception {
-    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1)
+        .setPerReplicaState(USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
     cluster.waitForActiveCollection("routing_collection", 2, 2);
     
     AbstractUpdateRequest request = new UpdateRequest()
@@ -1055,7 +1060,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
         .process(cluster.getSolrClient());
 
-    final String testCollection = "perReplicaState_test";
+    String testCollection = "perReplicaState_test";
     int liveNodes = cluster.getJettySolrRunners().size();
     CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
         .setPerReplicaState(Boolean.TRUE)
@@ -1068,6 +1073,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
     PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
     assertEquals(4, prs.states.size());
+    testCollection = "perReplicaState_testv2";
+    new V2Request.Builder("/collections")
+        .withMethod(POST)
+        .withPayload("{create: {name: perReplicaState_testv2, config : conf, numShards : 2, nrtReplicas : 2, perReplicaState : true, maxShardsPerNode : 5}}")
+        .build()
+        .process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(testCollection, 2, 4);
+    c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+    c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+    prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
+    assertEquals(4, prs.states.size());
   }
 
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
index 25a095d..b6ea6f7 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -18,8 +18,6 @@
 package org.apache.solr.common.cloud;
 
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
@@ -31,107 +29,105 @@ import org.junit.After;
 import org.junit.Before;
 
 public class TestPerReplicaStates extends SolrCloudTestCase {
-    @Before
-    public void prepareCluster() throws Exception {
-        configureCluster(4)
-                .configure();
+  @Before
+  public void prepareCluster() throws Exception {
+    configureCluster(4)
+        .configure();
+  }
+
+  @After
+  public void tearDownCluster() throws Exception {
+    shutdownCluster();
+  }
+
+  public void testBasic() {
+    PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+    assertEquals("R1:1:A", rs.asString);
+
+    rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+    assertEquals("R1:1:D:L", rs.asString);
+    rs = PerReplicaStates.State.parse (rs.asString);
+    assertEquals(State.DOWN, rs.state);
+
+  }
+
+  public void testEntries() {
+    PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+    assertEquals(2, entries.get("R1").version);
+    entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+    assertEquals(2, entries.get("R1").version);
+    assertEquals(2, entries.get("R1").getDuplicates().size());
+    Set<String> modified = PerReplicaStates.findModifiedReplicas(entries,  new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+    assertEquals(1, modified.size());
+    assertTrue(modified.contains("R3"));
+    modified = PerReplicaStates.findModifiedReplicas( entries,
+        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+    assertEquals(2, modified.size());
+    assertTrue(modified.contains("R3"));
+    assertTrue(modified.contains("R4"));
+    modified = PerReplicaStates.findModifiedReplicas( entries,
+        new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+    assertEquals(3, modified.size());
+    assertTrue(modified.contains("R3"));
+    assertTrue(modified.contains("R4"));
+    assertTrue(modified.contains("R2"));
+
+
+  }
+
+  public void testReplicaStateOperations() throws Exception {
+    String root = "/testReplicaStateOperations";
+    cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+    ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+    for (String state : states) {
+      cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
     }
 
-    @After
-    public void tearDownCluster() throws Exception {
-        shutdownCluster();
-    }
-
-    public void testBasic() {
-        PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
-        assertEquals("R1:1:A", rs.asString);
-
-        rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
-        assertEquals("R1:1:D:L", rs.asString);
-        rs = PerReplicaStates.State.parse (rs.asString);
-        assertEquals(State.DOWN, rs.state);
-
-    }
-
-    public void testEntries() {
-        PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
-        assertEquals(2, entries.get("R1").version);
-        entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
-        assertEquals(2, entries.get("R1").version);
-        assertEquals(2, entries.get("R1").getDuplicates().size());
-        Set<String> modified = PerReplicaStates.findModifiedReplicas(entries,  new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
-        assertEquals(1, modified.size());
-        assertTrue(modified.contains("R3"));
-        modified = PerReplicaStates.findModifiedReplicas( entries,
-                new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
-        assertEquals(2, modified.size());
-        assertTrue(modified.contains("R3"));
-        assertTrue(modified.contains("R4"));
-        modified = PerReplicaStates.findModifiedReplicas( entries,
-                new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
-        assertEquals(3, modified.size());
-        assertTrue(modified.contains("R3"));
-        assertTrue(modified.contains("R4"));
-        assertTrue(modified.contains("R2"));
-
-
-    }
-
-    public void testReplicaStateOperations() throws Exception {
-        String root = "/testReplicaStateOperations";
-        cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
-
-        ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
-
-        for (String state : states) {
-            cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
-        }
-
-        ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
-        PerReplicaStates rs = zkStateReader.getReplicaStates(new PerReplicaStates(root, 0, Collections.emptyList()));
-        assertEquals(3, rs.states.size());
-        assertTrue(rs.cversion >= 5);
-
-        List<PerReplicaStates.Op> ops = PerReplicaStates.WriteOps.addReplica("R5",State.ACTIVE, false, rs).get();
-
-        assertEquals(1, ops.size());
-        assertEquals(PerReplicaStates.Op.Type.ADD ,ops.get(0).typ );
-        PerReplicaStates.persist(ops, root,cluster.getZkClient());
-        rs = zkStateReader.getReplicaStates(root);
-        assertEquals(4, rs.states.size());
-        assertTrue(rs.cversion >= 6);
-        assertEquals(6,  cluster.getZkClient().getChildren(root, null,true).size());
-        ops =  PerReplicaStates.WriteOps.flipState("R1", State.DOWN , rs).get();
-
-        assertEquals(4, ops.size());
-        assertEquals(PerReplicaStates.Op.Type.ADD,  ops.get(0).typ);
-        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(1).typ);
-        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(2).typ);
-        assertEquals(PerReplicaStates.Op.Type.DELETE,  ops.get(3).typ);
-        PerReplicaStates.persist(ops, root,cluster.getZkClient());
-        rs = zkStateReader.getReplicaStates(root);
-        assertEquals(4, rs.states.size());
-        assertEquals(3, rs.states.get("R1").version);
-
-        ops =  PerReplicaStates.WriteOps.deleteReplica("R5" , rs).get();
-        assertEquals(1, ops.size());
-        PerReplicaStates.persist(ops, root,cluster.getZkClient());
-
-        rs = zkStateReader.getReplicaStates(root);
-        assertEquals(3, rs.states.size());
-
-        ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs).get();
-
-        assertEquals(2, ops.size());
-        assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
-        assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
-        PerReplicaStates.persist(ops, root,cluster.getZkClient());
-        rs = zkStateReader.getReplicaStates(root);
-        ops =  PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs).get();
-        assertEquals(4, ops.size());
-        PerReplicaStates.persist(ops, root,cluster.getZkClient());
-        rs = zkStateReader.getReplicaStates(root);
-        assertTrue(rs.get("R3").isLeader);
-    }
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    PerReplicaStates rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    assertEquals(3, rs.states.size());
+    assertTrue(rs.cversion >= 5);
+
+    PerReplicaStatesOps ops = PerReplicaStatesOps.addReplica("R5",State.ACTIVE, false, rs);
+    assertEquals(1, ops.get().size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD , ops.ops.get(0).typ );
+    ops.persist(root,cluster.getZkClient());
+    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    assertEquals(4, rs.states.size());
+    assertTrue(rs.cversion >= 6);
+    assertEquals(6,  cluster.getZkClient().getChildren(root, null,true).size());
+    ops =  PerReplicaStatesOps.flipState("R1", State.DOWN , rs);
+
+    assertEquals(4, ops.ops.size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD,  ops.ops.get(0).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(1).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(2).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE,  ops.ops.get(3).typ);
+    ops.persist(root, cluster.getZkClient());
+    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    assertEquals(4, rs.states.size());
+    assertEquals(3, rs.states.get("R1").version);
+
+    ops =  PerReplicaStatesOps.deleteReplica("R5" , rs);
+    assertEquals(1, ops.ops.size());
+    ops.persist(root,cluster.getZkClient());
+
+    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    assertEquals(3, rs.states.size());
+
+    ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs);
+    assertEquals(2, ops.ops.size());
+    assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
+    assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
+    ops.persist(root,cluster.getZkClient());
+    rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    ops =  PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs);
+    assertEquals(4, ops.ops.size());
+    ops.persist(root,cluster.getZkClient());
+    rs =PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+    assertTrue(rs.get("R3").isLeader);
+  }
 
 }