You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by GitBox <gi...@apache.org> on 2021/01/08 01:15:47 UTC

[GitHub] [lucene-solr] noblepaul opened a new pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

noblepaul opened a new pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] noblepaul commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
noblepaul commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554294274



##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;

Review comment:
       If an old version is not deleted for a given replica , it is a duplicate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] noblepaul commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
noblepaul commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554585836



##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -174,20 +215,30 @@ public boolean hasPendingUpdates() {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if (!hasPendingUpdates()) return clusterState;
+    if ((updates == this.updates)
+        && !hasPendingUpdates()) {
+      return clusterState;
+    }
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          DocCollection c = entry.getValue();
+          ZkWriteCommand cmd = entry.getValue();
+          DocCollection c = cmd.collection;
 
+          if (cmd.ops != null && cmd.ops.isPreOp()) {
+            cmd.ops.persist(path, reader.getZkClient());
+            clusterState = clusterState.copyWith(name,
+                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+          }
+          if (!cmd.persistCollState) continue;

Review comment:
       preOp is a command that should be run before updating the state.json operation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] noblepaul commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
noblepaul commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554294274



##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;

Review comment:
       If an old version is not deleted for a given replica , it is a duplicate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] madrob commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
madrob commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554164613



##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -64,31 +64,8 @@
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ConnectionManager;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocCollectionWatcher;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.common.cloud.NodesSysPropsCacher;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.*;

Review comment:
       please don't do wildcard imports.

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
##########
@@ -76,7 +80,12 @@
       }
 
       if (needToUpdateCollection) {

Review comment:
       we no longer need this boolean, can check whether downedReplicas is empty or not to decide

##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -1609,12 +1586,40 @@ public void publish(final CoreDescriptor cd, final Replica.State state, boolean
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      overseerJobQueue.offer(Utils.toJSON(m));
+      DocCollection coll = zkStateReader.getCollection(collection);
+      if (forcePublish || sendToOverseer(coll, coreNodeName)) {

Review comment:
       other places we do additional checks, like https://github.com/apache/lucene-solr/pull/2187/files#diff-5307cc9f51d88f5a171591d2f429779e22b0168cd2114275d745bccea9d1a6b3R186
   
   please be consistent about whether we care to optimize for early termination (I suspect we don't)

##########
File path: solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
##########
@@ -61,7 +60,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Nightly
+//@Nightly

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
##########
@@ -129,11 +143,23 @@ public Replica(String name, Map<String,Object> propMap, String collection, Strin
     Objects.requireNonNull(this.nodeName, "'node_name' must not be null");
     Objects.requireNonNull(this.core, "'core' must not be null");
     Objects.requireNonNull(this.type, "'type' must not be null");
-    if (propMap.get(ZkStateReader.STATE_PROP) != null) {
-      this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
+    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
+      log.debug("A replica  {} state fetched from per-replica state", name);

Review comment:
       nit: I think this would be confusing, people might think it means that the state fetched was "A" for Active. Would log "Replica..."

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
##########
@@ -1576,14 +1614,24 @@ public static DocCollection getCollectionLive(ZkStateReader zkStateReader, Strin
     }
   }
 
-  private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
-    String collectionPath = getCollectionPath(coll);
+  public DocCollection fetchCollectionState(String coll, Watcher watcher, String path) throws KeeperException, InterruptedException {
+    String collectionPath = path == null ? getCollectionPath(coll) : path;
     while (true) {
+      ClusterState.initReplicaStateProvider(() -> {
+        try {
+          PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
+          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+          return replicaStates;
+        } catch (Exception e) {
+          //TODO

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states
+ * Do not directly manipulate the per replica states as it can become difficult to debug them
+ */
+public class PerReplicaStatesOps {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private PerReplicaStates rs;
+  List<PerReplicaStates.Operation> ops;
+  private boolean preOp = true;
+  final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
+
+  PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
+    this.fun = fun;
+  }
+
+  /**
+   * Persist a set of operations to Zookeeper
+   */
+  private void persist(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    if (operations == null || operations.isEmpty()) return;
+    if (log.isDebugEnabled()) {
+      log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+    }
+
+    List<Op> ops = new ArrayList<>(operations.size());
+    for (PerReplicaStates.Operation op : operations) {
+      //the state of the replica is being updated
+      String path = znode + "/" + op.state.asString;
+      ops.add(op.typ == PerReplicaStates.Operation.Type.ADD ?
+          Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
+          Op.delete(path, -1));
+    }
+    try {
+      zkClient.multi(ops, true);
+    } catch (KeeperException e) {
+      log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+      throw e;
+    }
+
+  }
+
+  static List<PerReplicaStates.Operation> addDeleteStaleNodes(List<PerReplicaStates.Operation> ops, PerReplicaStates.State rs) {

Review comment:
       this is kind of awkwardly named, add a java doc or maybe don't call them stale? what makes the nodes stale?

##########
File path: solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
##########
@@ -65,7 +65,10 @@ public static void setupCluster() throws Exception {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)

Review comment:
       repeated

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -174,20 +215,30 @@ public boolean hasPendingUpdates() {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if (!hasPendingUpdates()) return clusterState;
+    if ((updates == this.updates)
+        && !hasPendingUpdates()) {
+      return clusterState;
+    }
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          DocCollection c = entry.getValue();
+          ZkWriteCommand cmd = entry.getValue();
+          DocCollection c = cmd.collection;
 
+          if (cmd.ops != null && cmd.ops.isPreOp()) {
+            cmd.ops.persist(path, reader.getZkClient());
+            clusterState = clusterState.copyWith(name,
+                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+          }
+          if (!cmd.persistCollState) continue;

Review comment:
       should this check be earlier? I'm not clear on what the isPreOp commands do.

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
##########
@@ -420,5 +435,63 @@ public String toString() {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+  }
+  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {

Review comment:
       Can't we use ThreadLocal.withInitial instead? I don't understand what concurrency constraints we are trying to work with here, can you add some comments on why we are using a TL?

##########
File path: solr/CHANGES.txt
##########
@@ -45,6 +47,7 @@ Improvements
 
 * SOLR-15062: /api/cluster/zk/ls should give the stat of the current node (noble)
 
+=======

Review comment:
       ?

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
##########
@@ -281,13 +291,15 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
+      persistCollectionState = true;
     }
 
-    Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
+    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
+    Replica oldReplica = null;
     if (slice != null) {
-      Replica oldReplica = slice.getReplica(coreNodeName);
+      oldReplica = slice.getReplica(coreNodeName);

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
##########
@@ -420,5 +435,63 @@ public String toString() {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;

Review comment:
       use `Optional.orElse`

##########
File path: solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
##########
@@ -217,6 +221,7 @@ public void testWaitForStateChecksCurrentState() throws Exception {
   }
 
   @Test
+  @Ignore

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;

Review comment:
       what do duplicates mean in this context? previous states?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states

Review comment:
       👍 

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D

Review comment:
       Can we put the description of the state, and all of the pieces as a class comment? People might not realize they can look at serialize() for the description.

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;
+
+    private State(String serialized, List<String> pieces) {
+      this.asString = serialized;
+      replica = pieces.get(0);
+      version = Integer.parseInt(pieces.get(1));
+      String encodedStatus = pieces.get(2);
+      this.state = Replica.getState(encodedStatus);
+      isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+      duplicate = null;
+    }
+
+    public static State parse(String serialized) {
+      List<String> pieces = StrUtils.splitSmart(serialized, ':');
+      if (pieces.size() < 3) return null;
+      return new State(serialized, pieces);
+
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version) {
+      this(replica, state, isLeader, version, null);
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+      this.replica = replica;
+      this.state = state == null ? Replica.State.ACTIVE : state;
+      this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+      this.version = version;
+      asString = serialize();
+      this.duplicate = duplicate;
+    }
+
+    @Override
+    public void writeMap(EntryWriter ew) throws IOException {
+      ew.put(NAME, replica);
+      ew.put(VERSION, version);
+      ew.put(ZkStateReader.STATE_PROP, state.toString());
+      if (isLeader) ew.put(Slice.LEADER, isLeader);
+      ew.putIfNotNull("duplicate", duplicate);
+    }
+
+    private State insert(State duplicate) {
+      assert this.replica.equals(duplicate.replica);
+      if (this.version >= duplicate.version) {
+        if (this.duplicate != null) {
+          duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
+        }
+        return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+      } else {
+        return duplicate.insert(this);
+      }
+    }
+
+    /**
+     * fetch duplicates entries for this replica
+     */
+    List<State> getDuplicates() {
+      if (duplicate == null) return Collections.emptyList();
+      List<State> result = new ArrayList<>();
+      State current = duplicate;
+      while (current != null) {
+        result.add(current);
+        current = current.duplicate;
+      }
+      return result;
+    }
+
+    private String serialize() {
+      StringBuilder sb = new StringBuilder(replica)
+          .append(":")
+          .append(version)
+          .append(":")
+          .append(state.shortName);
+      if (isLeader) sb.append(":").append("L");
+      return sb.toString();
+    }
+
+
+    @Override
+    public String toString() {
+      return asString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof State) {
+        State that = (State) o;
+        return Objects.equals(this.asString, that.asString);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return asString.hashCode();
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+    appendStates(sb);
+    return sb.append("]}").toString();
+  }
+
+  private StringBuilder appendStates(StringBuilder sb) {
+    states.forEachEntry(new BiConsumer<String, State>() {
+      int count = 0;
+      @Override
+      public void accept(String s, State state) {
+        if (count++ > 0) sb.append(", ");
+        sb.append(state.asString);
+        for (State d : state.getDuplicates()) sb.append(d.asString);
+      }
+    });
+    return sb;
+  }

Review comment:
       move these methods to above the definition of State, it was confusing to me which class they belong to otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] noblepaul merged pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
noblepaul merged pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] noblepaul commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
noblepaul commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554584632



##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
##########
@@ -420,5 +435,63 @@ public String toString() {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+  }
+  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {

Review comment:
       It's not about concurrency. It was  just one option




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene-solr] madrob commented on a change in pull request #2187: SOLR-15052 Reducing overseer bottlenecks using per-replica states (8x)

Posted by GitBox <gi...@apache.org>.
madrob commented on a change in pull request #2187:
URL: https://github.com/apache/lucene-solr/pull/2187#discussion_r554164613



##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -64,31 +64,8 @@
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ConnectionManager;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocCollectionWatcher;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.common.cloud.NodesSysPropsCacher;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.*;

Review comment:
       please don't do wildcard imports.

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
##########
@@ -76,7 +80,12 @@
       }
 
       if (needToUpdateCollection) {

Review comment:
       we no longer need this boolean, can check whether downedReplicas is empty or not to decide

##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -1609,12 +1586,40 @@ public void publish(final CoreDescriptor cd, final Replica.State state, boolean
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      overseerJobQueue.offer(Utils.toJSON(m));
+      DocCollection coll = zkStateReader.getCollection(collection);
+      if (forcePublish || sendToOverseer(coll, coreNodeName)) {

Review comment:
       other places we do additional checks, like https://github.com/apache/lucene-solr/pull/2187/files#diff-5307cc9f51d88f5a171591d2f429779e22b0168cd2114275d745bccea9d1a6b3R186
   
   please be consistent about whether we care to optimize for early termination (I suspect we don't)

##########
File path: solr/core/src/test/org/apache/solr/handler/TestStressThreadBackup.java
##########
@@ -61,7 +60,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Nightly
+//@Nightly

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
##########
@@ -129,11 +143,23 @@ public Replica(String name, Map<String,Object> propMap, String collection, Strin
     Objects.requireNonNull(this.nodeName, "'node_name' must not be null");
     Objects.requireNonNull(this.core, "'core' must not be null");
     Objects.requireNonNull(this.type, "'type' must not be null");
-    if (propMap.get(ZkStateReader.STATE_PROP) != null) {
-      this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
+    ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
+      log.debug("A replica  {} state fetched from per-replica state", name);

Review comment:
       nit: I think this would be confusing, people might think it means that the state fetched was "A" for Active. Would log "Replica..."

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
##########
@@ -1576,14 +1614,24 @@ public static DocCollection getCollectionLive(ZkStateReader zkStateReader, Strin
     }
   }
 
-  private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
-    String collectionPath = getCollectionPath(coll);
+  public DocCollection fetchCollectionState(String coll, Watcher watcher, String path) throws KeeperException, InterruptedException {
+    String collectionPath = path == null ? getCollectionPath(coll) : path;
     while (true) {
+      ClusterState.initReplicaStateProvider(() -> {
+        try {
+          PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
+          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+          return replicaStates;
+        } catch (Exception e) {
+          //TODO

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states
+ * Do not directly manipulate the per replica states as it can become difficult to debug them
+ */
+public class PerReplicaStatesOps {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private PerReplicaStates rs;
+  List<PerReplicaStates.Operation> ops;
+  private boolean preOp = true;
+  final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
+
+  PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
+    this.fun = fun;
+  }
+
+  /**
+   * Persist a set of operations to Zookeeper
+   */
+  private void persist(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    if (operations == null || operations.isEmpty()) return;
+    if (log.isDebugEnabled()) {
+      log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+    }
+
+    List<Op> ops = new ArrayList<>(operations.size());
+    for (PerReplicaStates.Operation op : operations) {
+      //the state of the replica is being updated
+      String path = znode + "/" + op.state.asString;
+      ops.add(op.typ == PerReplicaStates.Operation.Type.ADD ?
+          Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
+          Op.delete(path, -1));
+    }
+    try {
+      zkClient.multi(ops, true);
+    } catch (KeeperException e) {
+      log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+      throw e;
+    }
+
+  }
+
+  static List<PerReplicaStates.Operation> addDeleteStaleNodes(List<PerReplicaStates.Operation> ops, PerReplicaStates.State rs) {

Review comment:
       this is kind of awkwardly named, add a java doc or maybe don't call them stale? what makes the nodes stale?

##########
File path: solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java
##########
@@ -65,7 +65,10 @@ public static void setupCluster() throws Exception {
       collection = COLLECTIONORALIAS;
     }
 
-    CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+    CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)

Review comment:
       repeated

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -174,20 +215,30 @@ public boolean hasPendingUpdates() {
    * @throws KeeperException       if any ZooKeeper operation results in an error
    * @throws InterruptedException  if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
-    if (!hasPendingUpdates()) return clusterState;
+    if ((updates == this.updates)
+        && !hasPendingUpdates()) {
+      return clusterState;
+    }
     Timer.Context timerContext = stats.time("update_state");
     boolean success = false;
     try {
       if (!updates.isEmpty()) {
-        for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+        for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
           String name = entry.getKey();
           String path = ZkStateReader.getCollectionPath(name);
-          DocCollection c = entry.getValue();
+          ZkWriteCommand cmd = entry.getValue();
+          DocCollection c = cmd.collection;
 
+          if (cmd.ops != null && cmd.ops.isPreOp()) {
+            cmd.ops.persist(path, reader.getZkClient());
+            clusterState = clusterState.copyWith(name,
+                  cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+          }
+          if (!cmd.persistCollState) continue;

Review comment:
       should this check be earlier? I'm not clear on what the isPreOp commands do.

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
##########
@@ -420,5 +435,63 @@ public String toString() {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+  }
+  public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {

Review comment:
       Can't we use ThreadLocal.withInitial instead? I don't understand what concurrency constraints we are trying to work with here, can you add some comments on why we are using a TL?

##########
File path: solr/CHANGES.txt
##########
@@ -45,6 +47,7 @@ Improvements
 
 * SOLR-15062: /api/cluster/zk/ls should give the stat of the current node (noble)
 
+=======

Review comment:
       ?

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
##########
@@ -281,13 +291,15 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
+      persistCollectionState = true;
     }
 
-    Slice slice = collection != null ?  collection.getSlice(sliceName) : null;
+    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
 
     Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
+    Replica oldReplica = null;
     if (slice != null) {
-      Replica oldReplica = slice.getReplica(coreNodeName);
+      oldReplica = slice.getReplica(coreNodeName);

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
##########
@@ -420,5 +435,63 @@ public String toString() {
   public int size() {
     return collectionStates.size();
   }
+  interface ReplicaStatesProvider {
+
+    Optional<ReplicaStatesProvider> get();
+
+    PerReplicaStates getStates();
+
+  }
+
+  private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+    @Override
+    public Optional<ReplicaStatesProvider> get() {
+      return Optional.empty();
+    }
+
+    @Override
+    public PerReplicaStates getStates() {
+      throw new RuntimeException("Invalid operation");
+    }
+
+  };
+
+  private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+  public static ReplicaStatesProvider getReplicaStatesProvider() {
+    return  (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;

Review comment:
       use `Optional.orElse`

##########
File path: solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
##########
@@ -217,6 +221,7 @@ public void testWaitForStateChecksCurrentState() throws Exception {
   }
 
   @Test
+  @Ignore

Review comment:
       ?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;

Review comment:
       what do duplicates mean in this context? previous states?

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states

Review comment:
       👍 

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D

Review comment:
       Can we put the description of the state, and all of the pieces as a class comment? People might not realize they can look at serialize() for the description.

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final char SEPARATOR = ':';
+  //no:of times to retry in case of a CAS failure
+  public static final int MAX_RETRIES = 5;
+
+
+  @JsonProperty
+  public final String path;
+
+  @JsonProperty
+  public final int cversion;
+
+  @JsonProperty
+  public final SimpleMap<State> states;
+
+  /**
+   * Construct with data read from ZK
+   * @param path path from where this is loaded
+   * @param cversion the current child version of the znode
+   * @param states the per-replica states (the list of all child nodes)
+   */
+  public PerReplicaStates(String path, int cversion, List<String> states) {
+    this.path = path;
+    this.cversion = cversion;
+    Map<String, State> tmp = new LinkedHashMap<>();
+
+    for (String state : states) {
+      State rs = State.parse(state);
+      if (rs == null) continue;
+      State existing = tmp.get(rs.replica);
+      if (existing == null) {
+        tmp.put(rs.replica, rs);
+      } else {
+        tmp.put(rs.replica, rs.insert(existing));
+      }
+    }
+    this.states = new WrappedSimpleMap<>(tmp);
+
+  }
+
+  /**Get the changed replicas
+   */
+  public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+    Set<String> result = new HashSet<>();
+    if (fresh == null) {
+      old.states.forEachKey(result::add);
+      return result;
+    }
+    old.states.forEachEntry((s, state) -> {
+      // the state is modified or missing
+      if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+    });
+    fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+    });
+    return result;
+  }
+
+
+  /**
+   * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+   * If this is not modified, the same object is returned
+   */
+  public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+    try {
+      if (current != null) {
+        Stat stat = zkClient.exists(current.path, null, true);
+        if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+        if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+      }
+      Stat stat = new Stat();
+      List<String> children = zkClient.getChildren(path, null, stat, true);
+      return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+    } catch (InterruptedException e) {
+      SolrZkClient.checkInterrupted(e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+    }
+  }
+
+
+  public static String getReplicaName(String s) {
+    int idx = s.indexOf(SEPARATOR);
+    if (idx > 0) {
+      return s.substring(0, idx);
+    }
+    return null;
+  }
+
+  public State get(String replica) {
+    return states.get(replica);
+  }
+
+  public static class Operation {
+    public final Type typ;
+    public final State state;
+
+    public Operation(Type typ, State replicaState) {
+      this.typ = typ;
+      this.state = replicaState;
+    }
+
+
+    public enum Type {
+      //add a new node
+      ADD,
+      //delete an existing node
+      DELETE
+    }
+
+    @Override
+    public String toString() {
+      return typ.toString() + " : " + state;
+    }
+  }
+
+
+  /**
+   * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+   */
+  public static class State implements MapWriter {
+
+    public final String replica;
+
+    public final Replica.State state;
+
+    public final Boolean isLeader;
+
+    public final int version;
+
+    public final String asString;
+
+    /**
+     * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+     * <p>
+     * the entry with '13' is the latest and the one with '12' is considered a duplicate
+     * <p>
+     * These are unlikely, but possible
+     */
+    final State duplicate;
+
+    private State(String serialized, List<String> pieces) {
+      this.asString = serialized;
+      replica = pieces.get(0);
+      version = Integer.parseInt(pieces.get(1));
+      String encodedStatus = pieces.get(2);
+      this.state = Replica.getState(encodedStatus);
+      isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+      duplicate = null;
+    }
+
+    public static State parse(String serialized) {
+      List<String> pieces = StrUtils.splitSmart(serialized, ':');
+      if (pieces.size() < 3) return null;
+      return new State(serialized, pieces);
+
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version) {
+      this(replica, state, isLeader, version, null);
+    }
+
+    public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+      this.replica = replica;
+      this.state = state == null ? Replica.State.ACTIVE : state;
+      this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+      this.version = version;
+      asString = serialize();
+      this.duplicate = duplicate;
+    }
+
+    @Override
+    public void writeMap(EntryWriter ew) throws IOException {
+      ew.put(NAME, replica);
+      ew.put(VERSION, version);
+      ew.put(ZkStateReader.STATE_PROP, state.toString());
+      if (isLeader) ew.put(Slice.LEADER, isLeader);
+      ew.putIfNotNull("duplicate", duplicate);
+    }
+
+    private State insert(State duplicate) {
+      assert this.replica.equals(duplicate.replica);
+      if (this.version >= duplicate.version) {
+        if (this.duplicate != null) {
+          duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
+        }
+        return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+      } else {
+        return duplicate.insert(this);
+      }
+    }
+
+    /**
+     * fetch duplicates entries for this replica
+     */
+    List<State> getDuplicates() {
+      if (duplicate == null) return Collections.emptyList();
+      List<State> result = new ArrayList<>();
+      State current = duplicate;
+      while (current != null) {
+        result.add(current);
+        current = current.duplicate;
+      }
+      return result;
+    }
+
+    private String serialize() {
+      StringBuilder sb = new StringBuilder(replica)
+          .append(":")
+          .append(version)
+          .append(":")
+          .append(state.shortName);
+      if (isLeader) sb.append(":").append("L");
+      return sb.toString();
+    }
+
+
+    @Override
+    public String toString() {
+      return asString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof State) {
+        State that = (State) o;
+        return Objects.equals(this.asString, that.asString);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return asString.hashCode();
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+    appendStates(sb);
+    return sb.append("]}").toString();
+  }
+
+  private StringBuilder appendStates(StringBuilder sb) {
+    states.forEachEntry(new BiConsumer<String, State>() {
+      int count = 0;
+      @Override
+      public void accept(String s, State state) {
+        if (count++ > 0) sb.append(", ");
+        sb.append(state.asString);
+        for (State d : state.getDuplicates()) sb.append(d.asString);
+      }
+    });
+    return sb;
+  }

Review comment:
       move these methods to above the definition of State, it was confusing to me which class they belong to otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org