You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/12 01:57:08 UTC

[lucene-solr] branch reference_impl updated (320d8c4 -> d119b2e)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


 discard 320d8c4  @1458 Fix some issues with disordered updates.
     new d119b2e  @1458 Fix some issues with disordered updates.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (320d8c4)
            \
             N -- N -- N   refs/heads/reference_impl (d119b2e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 90 ++++++++++++++--------
 .../solr/cloud/ChaosMonkeySafeLeaderTest.java      |  2 +-
 .../src/resources/logconf/log4j2-startup-debug.xml |  2 +-
 3 files changed, 58 insertions(+), 36 deletions(-)


[lucene-solr] 01/01: @1458 Fix some issues with disordered updates.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d119b2ea9510df434cc3d842791281fe0f1d6279
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 19:56:41 2021 -0600

    @1458 Fix some issues with disordered updates.
    
    Took 1 hour 13 minutes
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  50 +-
 .../solr/cloud/OverseerTaskExecutorTask.java       |  35 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |   3 +
 .../apache/solr/cloud/overseer/NodeMutator.java    |   4 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 565 ++++++++++-----------
 .../java/org/apache/solr/core/CoreContainer.java   |   2 +-
 .../solr/cloud/ChaosMonkeySafeLeaderTest.java      |   2 +-
 .../api/collections/CustomCollectionTest.java      |   3 -
 .../apache/solr/common/cloud/ZkStateReader.java    | 158 +++---
 .../src/resources/logconf/log4j2-startup-debug.xml |   3 +-
 10 files changed, 396 insertions(+), 429 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 24c6856..3962539 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -71,6 +71,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -703,10 +704,10 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data, false);
   }
 
-  public Future processQueueItem(ZkNodeProps message) throws InterruptedException {
-    if (log.isDebugEnabled()) log.debug("processQueueItem {}", message);
+  public Future processQueueItem(List<WorkQueueWatcher.StateEntry> shardStateCollections) throws InterruptedException {
+    if (log.isDebugEnabled()) log.debug("processQueueItem {}", shardStateCollections);
 
-    Future future = new OverseerTaskExecutorTask(getCoreContainer(), message).run();
+    Future future = new OverseerTaskExecutorTask(getCoreContainer(), shardStateCollections).run();
 
     return future;
   }
@@ -802,7 +803,7 @@ public class Overseer implements SolrCloseable {
     }
   }
 
-  private static class WorkQueueWatcher extends QueueWatcher {
+  public static class WorkQueueWatcher extends QueueWatcher {
 
     public WorkQueueWatcher(CoreContainer cc, Overseer overseer) throws KeeperException {
       super(cc, overseer, Overseer.OVERSEER_QUEUE);
@@ -835,7 +836,8 @@ public class Overseer implements SolrCloseable {
         }
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
-        Set<String> shardStateCollections = null;
+        List<StateEntry> shardStateCollections = null;
+        Set<String> scollections = null;
         List<Future> futures = new ArrayList<>();
         for (byte[] item : data.values()) {
           final ZkNodeProps message = ZkNodeProps.load(item);
@@ -852,43 +854,52 @@ public class Overseer implements SolrCloseable {
 
             // hack
             if (operation.equals("updateshardstate")) {
-              if (shardStateCollections == null) {
-                shardStateCollections = new HashSet<>();
+              if (scollections == null) {
+                scollections = new HashSet<>();
               }
-              shardStateCollections.add(message.getStr("collection"));
+              scollections.add(message.getStr("collection"));
             }
 
-            Future future = overseer.processQueueItem(message);
-            if (future != null) {
-              futures.add(future);
+            if (shardStateCollections == null) {
+              shardStateCollections = new ArrayList<>();
             }
+            StateEntry entry = new StateEntry();
+            entry.message = message;
+            shardStateCollections.add(entry);
+
           } catch (Exception e) {
             log.error("Overseer state update queue processing failed", e);
           }
         }
-        for (Future future : futures) {
+        Future future = null;
+        try {
+          future = overseer.processQueueItem(shardStateCollections);
+        } catch (Exception e) {
+          log.error("Overseer state update queue processing failed", e);
+        }
+
           try {
             future.get();
           } catch (Exception e) {
             log.error("failed waiting for enqueued updates", e);
           }
-        }
+
         futures.clear();
         Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
         for (String collection : collections) {
           futures.add(overseer.writePendingUpdates(collection));
         }
 
-        for (Future future : futures) {
+        for (Future f : futures) {
           try {
-            future.get();
+            f.get();
           } catch (Exception e) {
             log.error("failed waiting for enqueued updates", e);
           }
         }
         futures.clear();
-        if (shardStateCollections != null) {
-          for (String collection : shardStateCollections) {
+        if (scollections != null) {
+          for (String collection : scollections) {
             futures.add(overseer.writePendingUpdates(collection));
           }
         }
@@ -914,6 +925,11 @@ public class Overseer implements SolrCloseable {
       }
     }
 
+    public static class StateEntry {
+      public ZkNodeProps message;
+      public String znodeName;
+    }
+
     private static class CollectionWorkQueueWatcher extends QueueWatcher {
       private final OverseerCollectionMessageHandler collMessageHandler;
       private final OverseerConfigSetMessageHandler configMessageHandler;
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index c118ac5..499fa4a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Future;
 
 public class OverseerTaskExecutorTask {
@@ -32,36 +34,31 @@ public class OverseerTaskExecutorTask {
   private final ZkController zkController;
   private final SolrCloudManager cloudManager;
   private final SolrZkClient zkClient;
-  private final ZkNodeProps message;
+  private final List<Overseer.WorkQueueWatcher.StateEntry> shardStateCollections;
 
-  public OverseerTaskExecutorTask(CoreContainer cc,  ZkNodeProps message) {
+  public OverseerTaskExecutorTask(CoreContainer cc, List<Overseer.WorkQueueWatcher.StateEntry> shardStateCollections) {
     this.zkController = cc.getZkController();
     this.zkClient = zkController.getZkClient();
     this.cloudManager = zkController.getSolrCloudManager();
-    this.message = message;
+    this.shardStateCollections = shardStateCollections;
   }
 
 
-  private Future processQueueItem(ZkNodeProps message) throws Exception {
-    if (log.isDebugEnabled()) log.debug("Consume state update from queue {} {}", message);
+  private Future processQueueItem(List<Overseer.WorkQueueWatcher.StateEntry> shardStateCollections) throws Exception {
+    if (log.isDebugEnabled()) log.debug("Consume state update from queue {} {}", shardStateCollections);
 
     // assert clusterState != null;
 
     //  if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
 
-    final String operation = message.getStr(Overseer.QUEUE_OPERATION);
-    if (operation == null) {
-      log.error("Message missing " + Overseer.QUEUE_OPERATION + ":" + message);
-      return null;
-    }
-
-    if (log.isDebugEnabled()) log.debug("Queue operation is {}", operation);
-
-    if (log.isDebugEnabled()) log.debug("Process message {} {}", message, operation);
 
-    if (log.isDebugEnabled()) log.debug("Enqueue message {}", operation);
+//    if (log.isDebugEnabled()) log.debug("Queue operation is {}", operation);
+//
+//    if (log.isDebugEnabled()) log.debug("Process message {} {}", message, operation);
+//
+//    if (log.isDebugEnabled()) log.debug("Enqueue message {}", operation);
     try {
-      return zkController.getOverseer().getZkStateWriter().enqueueUpdate(null, message, true);
+      return zkController.getOverseer().getZkStateWriter().enqueueUpdate(null, shardStateCollections, true);
     } catch (NullPointerException e) {
       log.info("Overseer is stopped, won't process message " + zkController.getOverseer());
       return null;
@@ -71,12 +68,12 @@ public class OverseerTaskExecutorTask {
 
 
   public Future run() {
-    if (log.isDebugEnabled()) log.debug("OverseerTaskExecutorTask, going to process message {}", message);
+    if (log.isDebugEnabled()) log.debug("OverseerTaskExecutorTask, going to process message {}", shardStateCollections);
 
     try {
-      return processQueueItem(message);
+      return processQueueItem(shardStateCollections);
     } catch (Exception e) {
-      log.error("Failed to process message " + message, e);
+      log.error("Failed to process message " + shardStateCollections, e);
     }
     return null;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index da8a8cc..0a9c52c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -213,6 +214,8 @@ public class StatePublisher implements Closeable {
           String collection = stateMessage.getStr(ZkStateReader.COLLECTION_PROP);
           String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
 
+          log.debug("submit state for publishing core={} state={}", core, state);
+
           if (core == null || state == null) {
             log.error("Nulls in published state");
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Nulls in published state " + stateMessage);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 438f7ea..ed297e2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -60,9 +60,9 @@ public class NodeMutator {
             throw new RuntimeException("Replica without node name! " + replica);
           }
           if (rNodeName.equals(nodeName)) {
-            log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
+            log.debug("Update replica state for {} to {}", replica, Replica.State.RECOVERING);
             Map<String, Object> props = replica.shallowCopy();
-            props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+            props.put(ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
             Replica newReplica = new Replica(replica.getName(), props, collection, replica.getCollectionId(), slice.getName(), nodeNameToBaseUrl);
             newReplicas.put(replica.getName(), newReplica);
             needToUpdateCollection = true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 1cb99f2..3c247bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -83,7 +83,7 @@ public class ZkStateWriter {
 
   private static class ColState {
     ReentrantLock collLock = new ReentrantLock(true);
-    ActionThrottle throttle = new ActionThrottle("ZkStateWriter", Integer.getInteger("solr.zkstatewriter.throttle", 50), new TimeSource.NanoTimeSource());
+    ActionThrottle throttle = new ActionThrottle("ZkStateWriter", Integer.getInteger("solr.zkstatewriter.throttle", 0), new TimeSource.NanoTimeSource());
   }
 
 
@@ -99,7 +99,7 @@ public class ZkStateWriter {
 
   }
 
-  public Future enqueueUpdate(DocCollection docCollection, ZkNodeProps message, boolean stateUpdate) throws Exception {
+  public Future enqueueUpdate(DocCollection docCollection, List<Overseer.WorkQueueWatcher.StateEntry> shardStateCollections, boolean stateUpdate) throws Exception {
     return ParWork.getRootSharedExecutor().submit(() -> {
 
       try {
@@ -194,235 +194,276 @@ public class ZkStateWriter {
             collState.collLock.unlock();
           }
         } else {
-          final String operation = message.getStr(StatePublisher.OPERATION);
-          OverseerAction overseerAction = OverseerAction.get(operation);
-          if (overseerAction == null) {
-            throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
-          }
-
-          switch (overseerAction) {
-            case STATE:
-              if (log.isDebugEnabled()) log.debug("state cmd {}", message);
-              message.getProperties().remove(StatePublisher.OPERATION);
-
-              Map<String,List<StateUpdate>> collStateUpdates = new HashMap<>();
-              for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
-                if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  continue;
-                } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  continue;
-                } else {
-                  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
-                  String id = entry.getKey();
+          Map<String,List<StateUpdate>> collStateUpdates = new HashMap<>();
+          for (Overseer.WorkQueueWatcher.StateEntry sentry : shardStateCollections) {
+            ZkNodeProps message = sentry.message;
+            final String operation = message.getStr(StatePublisher.OPERATION);
+            OverseerAction overseerAction = OverseerAction.get(operation);
+            if (overseerAction == null) {
+              throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
+            }
 
-                  String stateString = (String) entry.getValue();
-                  if (log.isDebugEnabled()) {
-                    log.debug("stateString={}", stateString);
-                  }
+            switch (overseerAction) {
+              case STATE:
+                if (log.isDebugEnabled()) log.debug("state cmd {}", message);
+                message.getProperties().remove(StatePublisher.OPERATION);
 
-                  long collectionId = Long.parseLong(id.split("-")[0]);
-                  String collection = idToCollection.get(collectionId);
-                  if (collection == null) {
-                    log.info("collection for id={} is null", collectionId);
+                for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+                  if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
                     continue;
-                  }
+                  } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
+                    continue;
+                  } else {
+                    if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+                    String id = entry.getKey();
 
-                  List<StateUpdate> updates = collStateUpdates.get(collection);
-                  if (updates == null) {
-                    updates = new ArrayList<>();
-                    collStateUpdates.put(collection, updates);
-                  }
+                    String stateString = (String) entry.getValue();
+                    if (log.isDebugEnabled()) {
+                      log.debug("stateString={}", stateString);
+                    }
 
-                  StateUpdate update = new StateUpdate();
-                  update.id = id;
-                  update.state = stateString;
-                  updates.add(update);
+                    long collectionId = Long.parseLong(id.split("-")[0]);
+                    String collection = idToCollection.get(collectionId);
+                    if (collection == null) {
+                      log.info("collection for id={} is null", collectionId);
+                      continue;
+                    }
 
-                }
-              }
+                    List<StateUpdate> updates = collStateUpdates.get(collection);
+                    if (updates == null) {
+                      updates = new ArrayList<>();
+                      collStateUpdates.put(collection, updates);
+                    }
 
-              for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
-                if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  continue;
-                } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  continue;
-                } else {
+                    StateUpdate update = new StateUpdate();
+                    update.id = id;
+                    update.state = stateString;
+                    updates.add(update);
 
-                  ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
-                    if (reentrantLock == null) {
-                      ColState colState = new ColState();
-                      return colState;
-                    }
-                    return reentrantLock;
-                  });
-
-                  collState.collLock.lock();
-                  try {
-                    //                if (collection == null) {
-                    //                  Collection<ClusterState.CollectionRef> colls = cs.getCollectionStates().values();
-                    //                  log.info("look for collection for id={} in {}}", id, cs.getCollectionStates().keySet());
-                    //
-                    //                  for (ClusterState.CollectionRef docCollectionRef : colls) {
-                    //                    DocCollection docCollection = docCollectionRef.get();
-                    //                    if (docCollection == null) {
-                    //                      log.info("docCollection={}", docCollection);
-                    //                    }
-                    //                    if (docCollection.getId() == collectionId) {
-                    //                      collection = docCollection.getName();
-                    //                      break;
-                    //                    }
-                    //                  }
-                    //                  if (collection == null) {
-                    //                    continue;
-                    //                  }
-                    //                }
-                    String collection = entry.getKey();
-                    for (StateUpdate state : entry.getValue()) {
-
-                      String setState = Replica.State.shortStateToState(state.state).toString();
-
-                      if (trackVersions.get(collection) == null) {
-                        // reader.forciblyRefreshClusterStateSlow(collection);
-                        DocCollection latestColl = null; //reader.getClusterState().getCollectionOrNull(collection);
-
-                        if (latestColl == null) {
-                          //log.info("no node exists, using version 0");
-                          trackVersions.remove(collection);
-                        } else {
-                          //  cs.getCollectionStates().put(latestColl.getName(), new ClusterState.CollectionRef(latestColl));
-                          //log.info("got version from zk {}", existsStat.getVersion());
-                          int version = latestColl.getZNodeVersion();
-                          log.info("Updating local tracked version to {} for {}", version, collection);
-                          trackVersions.put(collection, version);
-                        }
+                  }
+                }
+                Set<String> blockedNodes = new HashSet<>();
+                for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+                  if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
+                    cs.values().forEach(docColl -> {
+                      List<StateUpdate> updates = collStateUpdates.get(docColl.getName());
+                      if (updates == null) {
+                        updates = new ArrayList<>();
+                        collStateUpdates.put(docColl.getName(), updates);
                       }
+                      List<Replica> replicas = docColl.getReplicas();
+                      for (Replica replica : replicas) {
+                        if (replica.getNodeName().equals(entry.getValue())) {
+                          if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", operation, replica);
+                          // MRM TODO:
+                          //   Slice slice = docColl.getSlice(replica.getSlice());
+                          //          Replica leaderReplica = slice.getLeader();
+                          //          if (leaderReplica != null && replica == leaderReplica) {
+                          //            leaderReplica.getProperties().remove("leader");
+                          //            slice.setLeader(null);
+                          //          }
+                          dirtyState.add(docColl.getName());
+                          blockedNodes.add((String) entry.getValue());
+                          StateUpdate update = new StateUpdate();
+                          update.id = replica.getId();
+                          update.state = Replica.State.getShortState(Replica.State.DOWN);
+                          updates.add(update);
 
-                      ZkNodeProps updates = stateUpdates.get(collection);
-                      if (updates == null) {
-                        updates = new ZkNodeProps();
-                        stateUpdates.put(collection, updates);
+                        }
                       }
-                      Integer ver = trackVersions.get(collection);
-                      if (ver == null) {
-                        ver = 0;
+                    });
+                  }
+                  if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
+                    cs.values().forEach(docColl -> {
+                      List<StateUpdate> updates = collStateUpdates.get(docColl.getName());
+                      if (updates == null) {
+                        updates = new ArrayList<>();
+                        collStateUpdates.put(docColl.getName(), updates);
                       }
-                      updates.getProperties().put("_cs_ver_", ver.toString());
-
-                      log.debug("version for state updates {}", ver.toString());
-
-                      DocCollection docColl = cs.get(collection);
-                      if (docColl != null) {
-                        Replica replica = docColl.getReplicaById(state.id);
-                        log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
-                        if (replica != null) {
-                          if (setState.equals("leader")) {
-                            if (log.isDebugEnabled()) {
-                              log.debug("set leader {}", replica);
-                            }
-                            Slice slice = docColl.getSlice(replica.getSlice());
-                            slice.setLeader(replica);
-                            replica.setState(Replica.State.ACTIVE);
-                            replica.getProperties().put("leader", "true");
-                            Collection<Replica> replicas = slice.getReplicas();
-                            for (Replica r : replicas) {
-                              if (r != replica) {
-                                r.getProperties().remove("leader");
-                              }
-                            }
-                            updates.getProperties().put(replica.getId(), "l");
-                            dirtyState.add(collection);
-                          } else {
-                            Replica.State s = Replica.State.getState(setState);
-                            Replica existingLeader = docColl.getSlice(replica).getLeader();
-                            if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
-                              docColl.getSlice(replica).setLeader(null);
-                            }
-                            updates.getProperties().put(replica.getId(), Replica.State.getShortState(s));
-                            log.debug("set state {} {}", state, replica);
-                            replica.setState(s);
-                            dirtyState.add(collection);
-                          }
-                        } else {
-                          log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
-                        }
-                      } else {
-                        log.debug("Could not find existing collection name={}", collection);
-                        if (setState.equals("leader")) {
-                          updates.getProperties().put(state.id, "l");
-                          dirtyState.add(collection);
-                        } else {
-                          Replica.State s = Replica.State.getState(setState);
-                          updates.getProperties().put(state.id, Replica.State.getShortState(s));
-                          dirtyState.add(collection);
+                      List<Replica> replicas = docColl.getReplicas();
+                      for (Replica replica : replicas) {
+                        if (replica.getNodeName().equals(entry.getValue())) {
+                          if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", operation, replica);
+                          dirtyState.add(docColl.getName());
+                          //   blockedNodes.add((String) entry.getValue());
+                          StateUpdate update = new StateUpdate();
+                          update.id = replica.getId();
+                          update.state = Replica.State.getShortState(Replica.State.RECOVERING);
+                          updates.add(update);
                         }
                       }
-                    }
-                  } finally {
-                    collState.collLock.unlock();
+                    });
                   }
                 }
-              }
 
-              for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
-                if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  if (log.isDebugEnabled()) {
-                    log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+                break;
+              // MRM TODO:
+              //          case ADDROUTINGRULE:
+              //            return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
+              //          case REMOVEROUTINGRULE:
+              //            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
+              case UPDATESHARDSTATE:  // MRM TODO: look at how we handle this and make it so it can use StatePublisher
+                String collection = message.getStr("collection");
+                message.getProperties().remove("collection");
+                message.getProperties().remove(StatePublisher.OPERATION);
+
+                ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
+                  if (reentrantLock == null) {
+                    ColState colState = new ColState();
+                    return colState;
                   }
-                  nodeOperation(entry, Replica.State.getShortState(Replica.State.DOWN));
-                } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
-                  if (log.isDebugEnabled()) {
-                    log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+                  return reentrantLock;
+                });
+
+                collState.collLock.lock();
+                try {
+                  DocCollection docColl = cs.get(collection);
+                  if (docColl != null) {
+                    for (Map.Entry<String,Object> e : message.getProperties().entrySet()) {
+                      Slice slice = docColl.getSlice(e.getKey());
+                      if (slice != null) {
+                        Slice.State state = Slice.State.getState((String) e.getValue());
+                        slice.setState(state);
+                        dirtyStructure.add(collection);
+                      }
+                    }
                   }
-                  nodeOperation(entry, Replica.State.getShortState(Replica.State.RECOVERING));
+                } finally {
+                  collState.collLock.unlock();
                 }
-              }
+                break;
+
+              default:
+                throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
 
-              break;
-            // MRM TODO:
-            //          case ADDROUTINGRULE:
-            //            return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
-            //          case REMOVEROUTINGRULE:
-            //            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
-            case UPDATESHARDSTATE:  // MRM TODO: look at how we handle this and make it so it can use StatePublisher
-              String collection = message.getStr("collection");
-              message.getProperties().remove("collection");
-              message.getProperties().remove(StatePublisher.OPERATION);
-
-              ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
-                if (reentrantLock == null) {
-                  ColState colState = new ColState();
-                  return colState;
+            }
+          }
+          for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+
+            ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
+              if (reentrantLock == null) {
+                ColState colState = new ColState();
+                return colState;
+              }
+              return reentrantLock;
+            });
+
+            collState.collLock.lock();
+            try {
+              //                if (collection == null) {
+              //                  Collection<ClusterState.CollectionRef> colls = cs.getCollectionStates().values();
+              //                  log.info("look for collection for id={} in {}}", id, cs.getCollectionStates().keySet());
+              //
+              //                  for (ClusterState.CollectionRef docCollectionRef : colls) {
+              //                    DocCollection docCollection = docCollectionRef.get();
+              //                    if (docCollection == null) {
+              //                      log.info("docCollection={}", docCollection);
+              //                    }
+              //                    if (docCollection.getId() == collectionId) {
+              //                      collection = docCollection.getName();
+              //                      break;
+              //                    }
+              //                  }
+              //                  if (collection == null) {
+              //                    continue;
+              //                  }
+              //                }
+              String collection = entry.getKey();
+              for (StateUpdate state : entry.getValue()) {
+
+                String setState = Replica.State.shortStateToState(state.state).toString();
+
+                ZkNodeProps updates = stateUpdates.get(collection);
+                if (updates == null) {
+                  updates = new ZkNodeProps();
+                  stateUpdates.put(collection, updates);
                 }
-                return reentrantLock;
-              });
+                Integer ver = trackVersions.get(collection);
+                if (ver == null) {
+                  ver = 0;
+                }
+                updates.getProperties().put("_cs_ver_", ver.toString());
+
+                log.debug("version for state updates {}", ver.toString());
 
-              collState.collLock.lock();
-              try {
                 DocCollection docColl = cs.get(collection);
                 if (docColl != null) {
-                  for (Map.Entry<String,Object> e : message.getProperties().entrySet()) {
-                    Slice slice = docColl.getSlice(e.getKey());
-                    if (slice != null) {
-                      Slice.State state = Slice.State.getState((String) e.getValue());
-                      slice.setState(state);
-                      dirtyStructure.add(collection);
+                  Replica replica = docColl.getReplicaById(state.id);
+                  log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+                  if (replica != null) {
+                    //                        if (blockedNodes.contains(replica.getNodeName())) {
+                    //                          continue;
+                    //                        }
+                    log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
+                    if (setState.equals("leader")) {
+                      if (log.isDebugEnabled()) {
+                        log.debug("set leader {}", replica);
+                      }
+                      Slice slice = docColl.getSlice(replica.getSlice());
+                      slice.setLeader(replica);
+                      replica.setState(Replica.State.ACTIVE);
+                      replica.getProperties().put("leader", "true");
+                      Collection<Replica> replicas = slice.getReplicas();
+                      for (Replica r : replicas) {
+                        if (r != replica) {
+                          r.getProperties().remove("leader");
+                        }
+                      }
+                      updates.getProperties().put(replica.getId(), "l");
+                      dirtyState.add(collection);
+                    } else {
+                      Replica.State s = Replica.State.getState(setState);
+                      Replica existingLeader = docColl.getSlice(replica).getLeader();
+                      if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
+                        docColl.getSlice(replica).setLeader(null);
+                      }
+                      updates.getProperties().put(replica.getId(), Replica.State.getShortState(s));
+                      log.debug("set state {} {}", state, replica);
+                      replica.setState(s);
+                      dirtyState.add(collection);
                     }
+                  } else {
+                    log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
+                  }
+                } else {
+                  log.debug("Could not find existing collection name={}", collection);
+                  if (setState.equals("leader")) {
+                    updates.getProperties().put(state.id, "l");
+                    dirtyState.add(collection);
+                  } else {
+                    Replica.State s = Replica.State.getState(setState);
+                    updates.getProperties().put(state.id, Replica.State.getShortState(s));
+                    dirtyState.add(collection);
                   }
                 }
-              } finally {
-                collState.collLock.unlock();
               }
-              break;
 
-            default:
-              throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
+
+            } finally {
+              collState.collLock.unlock();
+            }
           }
+          for (Map.Entry<String,List<StateUpdate>> update : collStateUpdates.entrySet()) {
 
+            String coll = update.getKey();
+            Integer ver = trackVersions.get(coll);
+            if (ver == null) {
+              ver = 0;
+            }
+            ZkNodeProps updateMap = stateUpdates.get(coll);
+            if (updateMap == null) {
+              updateMap = new ZkNodeProps();
+              stateUpdates.put(coll, updateMap);
+            }
+            updateMap.getProperties().put("_cs_ver_", ver.toString());
+            for (StateUpdate theUpdate : update.getValue()) {
+              updateMap.getProperties().put(theUpdate.id, theUpdate.state);
+            }
+          }
         }
 
 
 
-
       } catch (Exception e) {
         log.error("Exception while queuing update", e);
         throw e;
@@ -430,75 +471,6 @@ public class ZkStateWriter {
     });
   }
 
-  private void nodeOperation(Map.Entry<String,Object> entry, String operation) {
-    log.debug("zkwriter set node operation {} for {} cs={}}", operation, entry.getValue(), cs);
-
-
-    //
-    //    if (cs.getCollectionStates().size() == 0) {
-    //        reader.forciblyRefreshAllClusterStateSlow();
-    //        clusterState = reader.getClusterState().copy();
-    //       log.debug("set operation try again {} for {} cs={}}", operation, entry.getValue(), clusterState);
-    //       // cs = clusterState;
-    //    }
-
-    cs.values().forEach(docColl -> {
-      ColState collState = collLocks.compute(docColl.getName(), (s, reentrantLock) -> {
-        if (reentrantLock == null) {
-          ColState colState = new ColState();
-          return colState;
-        }
-        return reentrantLock;
-      });
-
-      collState.collLock.lock();
-      try {
-        if (trackVersions.get(docColl.getName()) == null) {
-          // reader.forciblyRefreshClusterStateSlow(docColl.getName());
-
-          //log.info("got version from zk {}", existsStat.getVersion());
-          int version = docColl.getZNodeVersion();
-          log.info("Updating local tracked version to {} for {}", version, docColl.getName());
-          trackVersions.put(docColl.getName(), version);
-          idToCollection.put(docColl.getId(), docColl.getName());
-
-        }
-
-        ZkNodeProps updates = stateUpdates.get(docColl.getName());
-        if (updates == null) {
-          updates = new ZkNodeProps();
-          stateUpdates.put(docColl.getName(), updates);
-        }
-        Integer ver = trackVersions.get(docColl.getName());
-        if (ver == null) {
-          ver = 1;
-        }
-        updates.getProperties().put("_cs_ver_", ver.toString());
-        //     dirtyState.add(docColl.getName());
-        //   dirtyStructure.add(docColl.getName());
-        List<Replica> replicas = docColl.getReplicas();
-        log.debug("update replicas with node operation {} reps={}", operation, replicas.size());
-        for (Replica replica : replicas) {
-          if (!Replica.State.getShortState(replica.getState()).equals(operation) && replica.getNodeName().equals(entry.getValue())) {
-            if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", operation, replica);
-            // MRM TODO:
-            //   Slice slice = docColl.getSlice(replica.getSlice());
-            //          Replica leaderReplica = slice.getLeader();
-            //          if (leaderReplica != null && replica == leaderReplica) {
-            //            leaderReplica.getProperties().remove("leader");
-            //            slice.setLeader(null);
-            //          }
-            replica.setState(Replica.State.shortStateToState(operation));
-            updates.getProperties().put(replica.getId(), operation);
-            dirtyState.add(docColl.getName());
-          }
-        }
-      } finally {
-        collState.collLock.unlock();
-      }
-    });
-  }
-
   public Integer lastWrittenVersion(String collection) {
     return trackVersions.get(collection);
   }
@@ -543,7 +515,7 @@ public class ZkStateWriter {
 
     if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
     Integer version = null;
-    if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
+    if (dirtyStructure.contains(collection.getName())) {
       log.info("process collection {}", collection);
       ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
         if (reentrantLock == null) {
@@ -565,39 +537,40 @@ public class ZkStateWriter {
         if (log.isTraceEnabled()) log.trace("process {}", collection);
         try {
           // log.info("get data for {}", name);
-          byte[] data = Utils.toJSON(singletonMap(name, collection));
+
           //  log.info("got data for {} {}", name, data.length);
 
           try {
 
-            if (dirtyStructure.contains(collection.getName())) {
-              if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+            if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+            byte[] data = Utils.toJSON(singletonMap(name, collection));
+            Integer v = trackVersions.get(collection.getName());
 
-              Integer v = trackVersions.get(collection.getName());
+            if (v != null) {
+              //log.info("got version from cache {}", v);
+              version = v;
+            } else {
+              version = 0;
+            }
+            lastVersion.set(version);
+            if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
 
-              if (v != null) {
-                //log.info("got version from cache {}", v);
-                version = v;
-              } else {
-                version = 0;
-              }
-              lastVersion.set(version);
-              if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+            reader.getZkClient().setData(path, data, version, true, false);
+            if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
 
-              reader.getZkClient().setData(path, data, version, true, false);
-              if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
-              trackVersions.put(collection.getName(), version + 1);
+            reader.getZkClient().setData(pathSCN, null, -1, true, false);
 
-              reader.getZkClient().setData(pathSCN, null, -1, true, false);
-              dirtyStructure.remove(collection.getName());
+            dirtyStructure.remove(collection.getName());
 
-              ZkNodeProps updates = stateUpdates.get(collection.getName());
-              if (updates != null) {
-                updates.getProperties().clear();
-                writeStateUpdates(collection, updates);
-              }
+            ZkNodeProps updates = stateUpdates.get(collection.getName());
+            if (updates != null) {
+              updates.getProperties().clear();
+              //(collection, updates);
+              //dirtyState.remove(collection.getName());
             }
 
+            trackVersions.put(collection.getName(), version + 1);
+
           } catch (KeeperException.NoNodeException e) {
             if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
 
@@ -622,13 +595,6 @@ public class ZkStateWriter {
             throw bve;
           }
 
-          if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
-            ZkNodeProps updates = stateUpdates.get(collection.getName());
-            if (updates != null) {
-              writeStateUpdates(collection, updates);
-            }
-          }
-
         } catch (KeeperException.BadVersionException bve) {
           badVersionException.set(bve);
         } catch (InterruptedException | AlreadyClosedException e) {
@@ -642,6 +608,17 @@ public class ZkStateWriter {
       }
     }
 
+    if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
+      ZkNodeProps updates = stateUpdates.get(collection.getName());
+      if (updates != null) {
+        try {
+          writeStateUpdates(collection, updates);
+        } catch (Exception e) {
+          log.error("exception writing state updates", e);
+        }
+      }
+    }
+
     //removeCollections.forEach(c ->  removeCollection(c));
 
     if (badVersionException.get() != null) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index c7a3b9f..2280546 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -870,7 +870,7 @@ public class CoreContainer implements Closeable {
           getZkController().getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (n, c) -> {
             if (c == null) {
               if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
-              return true;
+              return false;
             }
 
             String nodeName = getZkController().getNodeName();
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 6157d2d..88ea75e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -171,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
       assertTrue(String.valueOf(indexThread.getFailCount()), indexThread.getFailCount() < 10);
     }
 
-    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 30, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
       if (collectionState == null) return false;
       Collection<Slice> slices = collectionState.getSlices();
       for (Slice slice : slices) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
index 822ecda..613adc6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
@@ -28,15 +28,12 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.params.ShardParams._ROUTE_;
 
 /**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index a278b0b..5fd312b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1814,9 +1814,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     try {
       log.debug("get and process state updates for {}", coll);
       byte[] data = null;
-
+      Stat stat = new Stat();
       try {
-        data = getZkClient().getData(stateUpdatesPath, null, null, true, false);
+        data = getZkClient().getData(stateUpdatesPath, null, stat, true, false);
       } catch (NoNodeException e) {
         log.info("No node found for {}", stateUpdatesPath);
         return docCollection;
@@ -1837,18 +1837,28 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (log.isDebugEnabled()) log.debug("Got additional state updates with version {} {} cs={}", version, m, clusterState);
 
       m.remove("_cs_ver_");
-
+      m.put("_ver_", stat.getVersion());
       try {
         Set<Entry<String,Object>> entrySet = m.entrySet();
 
         if (docCollection != null) {
           // || (version > docCollection.getZNodeVersion() && clusterState.getZkClusterStateVersion() == -1)) {
-          if (version < docCollection.getZNodeVersion()) {
+          if (!docCollection.hasStateUpdates() && version < docCollection.getZNodeVersion()) {
             if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
             return docCollection;
           }
+
+          if (docCollection.hasStateUpdates()) {
+            int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
+            if (stat.getVersion() < oldVersion) {
+              if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
+              return docCollection;
+            }
+          }
+
           for (Entry<String,Object> entry : entrySet) {
             String id = entry.getKey();
+            if (id.equals("_ver_")) continue;
             Replica.State state = null;
             if (!entry.getValue().equals("l")) {
               state = Replica.State.shortStateToState((String) entry.getValue());
@@ -1948,49 +1958,49 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     String collectionPath = getCollectionPath(coll);
     if (log.isDebugEnabled()) log.debug("Looking at fetching full clusterstate collection={}", coll);
 
-//    int version = 0;
-//
-//    Stat stateStat = zkClient.exists(collectionPath, null, true, false);
-//    if (stateStat != null) {
-//      version = stateStat.getVersion();
-//      if (log.isDebugEnabled()) log.debug("version for cs is {}", version);
-//      // version we would get
-//      DocCollection docCollection = watchedCollectionStates.get(coll);
-//      if (docCollection != null) {
-//        int localVersion = docCollection.getZNodeVersion();
-//        if (log.isDebugEnabled())
-//          log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
-//        if (docCollection.hasStateUpdates()) {
-//          if (localVersion > version) {
-//            return docCollection;
-//          }
-//        } else {
-//          if (localVersion >= version) {
-//            return docCollection;
-//          }
-//        }
-//      }
-//
-//      if (lazyCollectionStates.containsKey(coll)) {
-//        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
-//        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
-//        if (cachedCollection != null) {
-//          int localVersion = cachedCollection.getZNodeVersion();
-//          if (cachedCollection.hasStateUpdates()) {
-//            if (localVersion > version) {
-//              return cachedCollection;
-//            }
-//          } else {
-//            if (localVersion >= version) {
-//              return cachedCollection;
-//            }
-//          }
-//        }
-//
-//      }
-//    } else {
-//      return null;
-//    }
+    int version = 0;
+
+    Stat stateStat = zkClient.exists(collectionPath, null, true, false);
+    if (stateStat != null) {
+      version = stateStat.getVersion();
+      if (log.isDebugEnabled()) log.debug("version for cs is {}", version);
+      // version we would get
+      DocCollection docCollection = watchedCollectionStates.get(coll);
+      if (docCollection != null) {
+        int localVersion = docCollection.getZNodeVersion();
+        if (log.isDebugEnabled())
+          log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
+        if (docCollection.hasStateUpdates()) {
+          if (localVersion == version) {
+            return docCollection;
+          }
+        } else {
+          if (localVersion == version) {
+            return docCollection;
+          }
+        }
+      }
+
+      if (lazyCollectionStates.containsKey(coll)) {
+        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+        if (cachedCollection != null) {
+          int localVersion = cachedCollection.getZNodeVersion();
+          if (cachedCollection.hasStateUpdates()) {
+            if (localVersion == version) {
+              return cachedCollection;
+            }
+          } else {
+            if (localVersion == version) {
+              return cachedCollection;
+            }
+          }
+        }
+
+      }
+    } else {
+      return null;
+    }
     if (log.isDebugEnabled()) log.debug("getting latest state.json");
     Stat stat = new Stat();
     byte[] data;
@@ -2002,8 +2012,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     if (data == null) return null;
     ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
     ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
-    return collectionRef == null ? null : collectionRef.get();
-
+    DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+    return docCollection;
   }
 
   public static String getCollectionPathRoot(String coll) {
@@ -2210,7 +2220,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
-    
+
+//    DocCollection coll = getCollectionOrNull(collection);
+//    if (predicate.matches(liveNodes, coll)) {
+//      return;
+//    }
     final CountDownLatch latch = new CountDownLatch(1);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
@@ -2395,55 +2409,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (newState == null) {
         if (log.isDebugEnabled()) log.debug("Removing cached collection state for [{}]", coll);
         watchedCollectionStates.remove(coll);
-//        CollectionStateWatcher sw = stateWatchersMap.remove(coll);
-//        if (sw != null) sw.removeWatch();
-//        IOUtils.closeQuietly(sw);
-//        lazyCollectionStates.remove(coll);
-//        if (collectionRemoved != null) {
-//          collectionRemoved.removed(coll);
-//        }
-//
-//        clusterState.remove(coll);
         return true;
       }
 
       if (live) {
         return true;
       }
-//      boolean updated = false;
-//      // CAS update loop
-//      while (true) {
-//        if (!collectionWatches.containsKey(coll)) {
-//          break;
-//        }
-//        DocCollection oldState = watchedCollectionStates.get(coll);
-//        if (oldState == null) {
-//          if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
-//            if (log.isDebugEnabled()) {
-//              log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
-//            }
-//            updated = true;
-//            break;
-//          }
-//        } else {
-//          if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
-//            // no change to state, but we might have been triggered by the addition of a
-//            // state watcher, so run notifications
-//            updated = true;
-//            break;
-//          }
-//        }
-//      DocCollection old = watchedCollectionStates.put(coll, newState);
-//      while (old != null && old.getZNodeVersion() > newState.getZNodeVersion()) {
-//        newState = old;
-       watchedCollectionStates.put(coll, newState);
-     //  clusterState.put(coll, new ClusterState.CollectionRef(newState));
-      // Resolve race with unregisterCore.
+
+      watchedCollectionStates.put(coll, newState);
       if (!collectionWatches.containsKey(coll)) {
-        watchedCollectionStates.remove(coll);
-        log.debug("Removing uninteresting collection [{}]", coll);
+        lazyCollectionStates.remove(coll);
       }
-  //    }
 
 
       return true;
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
index ca7f633..6f81795 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
@@ -62,13 +62,14 @@
         <AsyncLogger name="org.apache.solr.cloud.LeaderElector" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.cloud.ShardLeaderElectionContextBase" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.servlet.HttpSolrCall" level="DEBUG"/>
-
+        <AsyncLogger name="org.apache.solr.cloud.OverseerTaskExecutorTask" level="DEBUG"/>
 
         <!-- <AsyncLogger name="org.apache.solr.common.cloud.SolrZkClient" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.cloud.overseer.SliceMutator" level="DEBUG"/>-->
         <AsyncLogger name="org.apache.solr.client.solrj.impl.LBSolrClient" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.cloud.ZkController" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.common.cloud.ZkStateReader" level="DEBUG"/>
+        <AsyncLogger name="org.apache.solr.cloud.StatePublisher" level="DEBUG"/>
 
         <AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.core.CoreContainer" level="DEBUG"/>