You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/13 04:09:40 UTC

[lucene-solr] 01/01: @1467 Cleanup after stress work.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 17f883d9a4490f2812f829cab524f8af5205e591
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Mar 12 22:06:54 2021 -0600

    @1467 Cleanup after stress work.
    
    Took 10 minutes
---
 .../java/org/apache/solr/cloud/LeaderElector.java  |   4 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   | 233 +++-----
 .../solr/cloud/OverseerTaskExecutorTask.java       | 102 ----
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  22 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   2 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |  46 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  24 +-
 .../apache/solr/cloud/api/collections/Assign.java  |   2 +-
 .../api/collections/CollectionCmdResponse.java     |   7 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   6 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |  27 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   4 +-
 .../OverseerCollectionMessageHandler.java          |  12 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   8 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 621 ++++++++++-----------
 .../java/org/apache/solr/core/CoreContainer.java   |  23 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |   7 +-
 .../solr/handler/admin/CollectionsHandler.java     |   2 +-
 .../apache/solr/rest/ManagedResourceStorage.java   |   2 +-
 .../java/org/apache/solr/update/CommitTracker.java |   2 +-
 .../java/org/apache/solr/update/UpdateHandler.java |   5 +-
 .../src/java/org/apache/solr/update/UpdateLog.java |   4 +-
 .../java/org/apache/solr/update/VersionInfo.java   |   2 +-
 .../processor/DistributedUpdateProcessor.java      |   2 +-
 .../CreateCollectionsIndexAndRestartTest.java      |   2 +-
 .../cloud/api/collections/TestCollectionAPI.java   |   4 +-
 .../solrj/impl/SolrClientNodeStateProvider.java    |   2 +-
 .../org/apache/solr/common/cloud/ClusterState.java |  14 +
 .../solr/common/cloud/ConnectionManager.java       |   2 +-
 .../apache/solr/common/cloud/DocCollection.java    |   6 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  65 +--
 .../apache/solr/common/cloud/ZkStateReader.java    | 198 +++----
 35 files changed, 615 insertions(+), 853 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 8ac4d79..71873ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -206,7 +206,7 @@ public class LeaderElector implements Closeable {
           try {
             String watchedNode = holdElectionPath + "/" + toWatch;
 
-            log.info("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
+            log.debug("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
 
             ElectionWatcher oldWatcher = watcher;
             if (oldWatcher != null) {
@@ -405,7 +405,7 @@ public class LeaderElector implements Closeable {
           leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
         }
 
-        log.info("Joined leadership election with path: {}", leaderSeqPath);
+        log.debug("Joined leadership election with path: {}", leaderSeqPath);
         context.leaderSeqPath = leaderSeqPath;
         state = JOIN;
         break;
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index aec7014..5ae5e4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -74,6 +74,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -705,16 +706,8 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data, false);
   }
 
-  public Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws InterruptedException {
-    if (log.isDebugEnabled()) log.debug("processQueueItem {}", collStateUpdates);
-
-    Future future = new OverseerTaskExecutorTask(getCoreContainer(), collStateUpdates).run();
-
-    return future;
-  }
-
   public Future writePendingUpdates(String collection) {
-    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
+    return zkStateWriter.writePendingUpdates(collection);
   }
 
   private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -812,15 +805,12 @@ public class Overseer implements SolrCloseable {
 
     public void start() throws KeeperException, InterruptedException {
       if (closed) return;
-      ourLock.lock();
-      try {
-        if (closed) return;
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
-        startItems = super.getItems();
-        log.info("Overseer found entries on start {}", startItems);
+
+      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+      startItems = super.getItems();
+      log.info("Overseer found entries on start {}", startItems);
+      if (startItems.size() > 0) {
         processQueueItems(startItems, true);
-      } finally {
-        ourLock.unlock();
       }
     }
 
@@ -828,7 +818,9 @@ public class Overseer implements SolrCloseable {
     protected void processQueueItems(List<String> items, boolean onStart) {
       if (closed) return;
       List<String> fullPaths = new ArrayList<>(items.size());
+      CountDownLatch delCountDownLatch = null;
       ourLock.lock();
+      String forceWrite = null;
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -836,33 +828,22 @@ public class Overseer implements SolrCloseable {
         }
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
-        List<StateEntry> shardStateCollections = null;
-        Set<String> scollections = null;
-        List<Future> futures = new ArrayList<>();
-        for (byte[] item : data.values()) {
-          final ZkNodeProps message = ZkNodeProps.load(item);
-          try {
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
-            if (onStart) {
-              if (operation.equals("state")) {
-                message.getProperties().remove(OverseerAction.DOWNNODE);
-                if (message.getProperties().size() == 1) {
-                  continue;
-                }
-              }
-            }
 
-            // hack
-            if (operation.equals("updateshardstate")) {
-              if (scollections == null) {
-                scollections = new HashSet<>();
-              }
-              scollections.add(message.getStr("collection"));
+        if (fullPaths.size() > 0) {
+          if (!zkController.getZkClient().isClosed()) {
+            try {
+              delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
+            } catch (Exception e) {
+              log.warn("Failed deleting processed items", e);
             }
+          }
+        }
 
-            if (shardStateCollections == null) {
-              shardStateCollections = new ArrayList<>();
-            }
+        final List<StateEntry> shardStateCollections = new ArrayList<>();
+
+        for (byte[] item : data.values()) {
+          final ZkNodeProps message = ZkNodeProps.load(item);
+          try {
             StateEntry entry = new StateEntry();
             entry.message = message;
             shardStateCollections.add(entry);
@@ -872,8 +853,9 @@ public class Overseer implements SolrCloseable {
           }
         }
         Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates = new HashMap<>();
-        try {
-          for (Overseer.StateEntry sentry : shardStateCollections) {
+
+        for (Overseer.StateEntry sentry : shardStateCollections) {
+          try {
             ZkNodeProps stateUpdateMessage = sentry.message;
             final String op = stateUpdateMessage.getStr(StatePublisher.OPERATION);
             OverseerAction overseerAction = OverseerAction.get(op);
@@ -888,34 +870,9 @@ public class Overseer implements SolrCloseable {
 
                 for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
                   if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
-                    continue;
-                  } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
-                    continue;
-                  } else {
-                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
-                    String id = stateUpdateEntry.getKey();
-
-                    String stateString = (String) stateUpdateEntry.getValue();
-                    if (log.isDebugEnabled()) {
-                      log.debug("stateString={}", stateString);
-                    }
-                    String collId = id.substring(0, id.indexOf('-'));
-                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
-                    if (updates == null) {
-                      updates = new ArrayList<>();
-                      collStateUpdates.put(collId, updates);
+                    if (onStart) {
+                      continue;
                     }
-
-                    ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
-                    update.id = id;
-                    update.state = stateString;
-                    updates.add(update);
-
-                  }
-                }
-
-                for (Map.Entry<String,Object> m : stateUpdateMessage.getProperties().entrySet()) {
-                  if (OverseerAction.DOWNNODE.equals(OverseerAction.get(m.getKey()))) {
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
                       List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -925,7 +882,7 @@ public class Overseer implements SolrCloseable {
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
-                        if (replica.getNodeName().equals(m.getValue())) {
+                        if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
                           if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
@@ -934,8 +891,7 @@ public class Overseer implements SolrCloseable {
                         }
                       }
                     });
-                  }
-                  if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(m.getKey()))) {
+                  } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
                       List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -945,7 +901,7 @@ public class Overseer implements SolrCloseable {
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
-                        if (replica.getNodeName().equals(m.getValue())) {
+                        if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
                           if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
@@ -954,6 +910,25 @@ public class Overseer implements SolrCloseable {
                         }
                       }
                     });
+                  } else {
+                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
+                    String id = stateUpdateEntry.getKey();
+
+                    String stateString = (String) stateUpdateEntry.getValue();
+                    if (log.isDebugEnabled()) {
+                      log.debug("stateString={}", stateString);
+                    }
+                    String collId = id.substring(0, id.indexOf('-'));
+                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                    if (updates == null) {
+                      updates = new ArrayList<>();
+                      collStateUpdates.put(collId, updates);
+                    }
+
+                    ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+                    update.id = id;
+                    update.state = stateString;
+                    updates.add(update);
                   }
                 }
 
@@ -964,27 +939,29 @@ public class Overseer implements SolrCloseable {
               //          case REMOVEROUTINGRULE:
               //            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
               case UPDATESHARDSTATE:  // MRM TODO: look at how we handle this and make it so it can use StatePublisher
-                  String collection = stateUpdateMessage.getStr("collection");
-                  stateUpdateMessage.getProperties().remove("collection");
-                  stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
-                  Long collIdLong = zkStateWriter.getCS().get(collection).getId();
-                  if (collIdLong != null) {
-                    String collId = Long.toString(collIdLong);
-                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
-                    if (updates == null) {
-                      updates = new ArrayList<>();
-                      collStateUpdates.put(collId, updates);
-                    }
+                String collection = stateUpdateMessage.getStr("collection");
+                stateUpdateMessage.getProperties().remove("collection");
+                stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
+                Long collIdLong = zkStateWriter.getCS().get(collection).getId();
+                if (collIdLong != null) {
+                  String collId = Long.toString(collIdLong);
+                  List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                  if (updates == null) {
+                    updates = new ArrayList<>();
+                    collStateUpdates.put(collId, updates);
+                  }
 
-                    if (collIdLong != null) {
-                      for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
-                        ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
-                        update.sliceState = (String) stateUpdateEntry.getValue();
-                        update.sliceName = stateUpdateEntry.getKey();
-                        updates.add(update);
-                      }
+                  if (collIdLong != null) {
+                    for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
+                      // MRM TODO: slice state should be done like replica state, this is a hack
+                      forceWrite = collection;
+                      ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+                      update.sliceState = (String) stateUpdateEntry.getValue();
+                      update.sliceName = stateUpdateEntry.getKey();
+                      updates.add(update);
                     }
                   }
+                }
 
                 break;
 
@@ -992,61 +969,40 @@ public class Overseer implements SolrCloseable {
                 throw new RuntimeException("unknown operation:" + op + " contents:" + stateUpdateMessage.getProperties());
 
             }
+          } catch (Exception e) {
+            log.error("Overseer state update queue processing failed", e);
+            continue;
           }
-
-        } catch (Exception e) {
-          log.error("Overseer state update queue processing failed", e);
         }
 
-        Future future = null;
         try {
-          future = overseer.processQueueItem(collStateUpdates);
+          getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
         } catch (Exception e) {
           log.error("Overseer state update queue processing failed", e);
         }
 
-        try {
-          future.get();
-        } catch (Exception e) {
-          log.error("failed waiting for enqueued updates", e);
-        }
-
-        futures.clear();
         Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
         for (String collection : collections) {
-          futures.add(overseer.writePendingUpdates(collection));
+          overseer.writePendingUpdates(collection);
         }
 
-        for (Future f : futures) {
-          try {
-            f.get();
-          } catch (Exception e) {
-            log.error("failed waiting for enqueued updates", e);
-          }
-        }
-        futures.clear();
-        if (scollections != null) {
-          for (String collection : scollections) {
-            futures.add(overseer.writePendingUpdates(collection));
-          }
+        if (collections.size() == 0 && forceWrite != null) {
+          overseer.writePendingUpdates(forceWrite);
         }
-        //        for (Future future : futures) {
-        //          try {
-        //            future.get();
-        //          } catch (Exception e) {
-        //            log.error("failed waiting for enqueued updates", e);
-        //          }
-        //        }
-      } finally {
 
-        if (overseer.zkStateWriter != null) {
-          if (zkController.getZkClient().isAlive()) {
+      } finally {
+        try {
+          if (delCountDownLatch != null) {
             try {
-              zkController.getZkClient().delete(fullPaths, true);
-            } catch (Exception e) {
-              log.warn("Failed deleting processed items", e);
+              boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
+
+              if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+
+            } catch (InterruptedException e) {
+              ParWork.propagateInterrupt(e);
             }
           }
+        } finally {
           ourLock.unlock();
         }
       }
@@ -1085,17 +1041,14 @@ public class Overseer implements SolrCloseable {
     @Override
     public void start() throws KeeperException, InterruptedException {
       if (closed) return;
-      ourLock.lock();
-      try {
-        if (closed) return;
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        startItems = super.getItems();
+      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        log.info("Overseer found entries on start {}", startItems);
+      startItems = super.getItems();
+
+      log.info("Overseer found entries on start {}", startItems);
+      if (startItems.size() > 0) {
         processQueueItems(startItems, true);
-      } finally {
-        ourLock.unlock();
       }
     }
 
@@ -1105,7 +1058,7 @@ public class Overseer implements SolrCloseable {
       ourLock.lock();
       List<String> fullPaths = new ArrayList<>(items.size());
       try {
-        log.info("Found collection queue items {} onStart={}", items, onStart);
+        log.debug("Found collection queue items {} onStart={}", items, onStart);
         for (String item : items) {
           fullPaths.add(path + "/" + item);
         }
@@ -1216,7 +1169,7 @@ public class Overseer implements SolrCloseable {
           } else {
             byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
             completedMap.update(entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1), sdata);
-            log.info("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
+            log.debug("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
           }
 
         } catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
deleted file mode 100644
index e0f5f67..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.cloud.overseer.ZkStateWriter;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.core.CoreContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-public class OverseerTaskExecutorTask {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ZkController zkController;
-  private final SolrCloudManager cloudManager;
-  private final SolrZkClient zkClient;
-  private final Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates;
-
-  public OverseerTaskExecutorTask(CoreContainer cc, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) {
-    this.zkController = cc.getZkController();
-    this.zkClient = zkController.getZkClient();
-    this.cloudManager = zkController.getSolrCloudManager();
-    this.collStateUpdates = collStateUpdates;
-  }
-
-
-  private Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws Exception {
-    if (log.isDebugEnabled()) log.debug("Consume state update from queue {} {}", collStateUpdates);
-
-    // assert clusterState != null;
-
-    //  if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
-
-
-//    if (log.isDebugEnabled()) log.debug("Queue operation is {}", operation);
-//
-//    if (log.isDebugEnabled()) log.debug("Process message {} {}", message, operation);
-//
-//    if (log.isDebugEnabled()) log.debug("Enqueue message {}", operation);
-    try {
-      return zkController.getOverseer().getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
-    } catch (NullPointerException e) {
-      log.info("Overseer is stopped, won't process message " + zkController.getOverseer());
-      return null;
-    }
-
-  }
-
-
-  public Future run() {
-    if (log.isDebugEnabled()) log.debug("OverseerTaskExecutorTask, going to process message {}", collStateUpdates);
-
-    try {
-      return processQueueItem(collStateUpdates);
-    } catch (Exception e) {
-      log.error("Failed to process message " + collStateUpdates, e);
-    }
-    return null;
-  }
-
-  public static class WriteTask implements Runnable {
-    private final String collection;
-    CoreContainer coreContainer;
-
-    public WriteTask(CoreContainer coreContainer, String collection) {
-      this.collection = collection;
-      this.coreContainer = coreContainer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
-      } catch (NullPointerException e) {
-        if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
-      } catch (Exception e) {
-        log.error("Failed to process pending updates", e);
-      }
-    }
-  }
-}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index efb514c..88e138e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -309,7 +309,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
   @Override
   final public void run() {
     // set request info for logging
-    log.info("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
+    log.debug("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
     try {
       try (SolrCore core = cc.getCore(coreName)) {
         if (core == null) {
@@ -508,7 +508,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
   public final boolean doSyncOrReplicateRecovery(SolrCore core, Replica leader) throws Exception {
-    log.info("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
+    log.debug("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
 
     boolean successfulRecovery = false;
     boolean publishedActive = false;
@@ -555,10 +555,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         if (startingVersions.isEmpty()) {
-          log.info("startupVersions is empty");
+          log.debug("startupVersions is empty");
         } else {
-          if (log.isInfoEnabled()) {
-            log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
+          if (log.isDebugEnabled()) {
+            log.debug("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
                 startingVersions.get(startingVersions.size() - 1));
           }
         }
@@ -588,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
 
     if (replicaType == Replica.Type.TLOG) {
-      log.info("Stopping replication from leader for {}", coreName);
+      log.debug("Stopping replication from leader for {}", coreName);
       zkController.stopReplicationFromLeader(coreName);
     }
 
-    log.info("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
+    log.debug("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
 
     zkController.publish(core.getCoreDescriptor(), Replica.State.BUFFERING);
 
@@ -612,7 +612,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           return false;
         }
 
-        log.info("Begin buffering updates. core=[{}]", coreName);
+        log.debug("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
 
@@ -644,16 +644,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
             }
             if (syncSuccess) {
               SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
-              log.info("PeerSync was successful, commit to force open a new searcher");
+              log.debug("PeerSync was successful, commit to force open a new searcher");
               // force open a new searcher
               core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
               req.close();
-              log.info("PeerSync stage of recovery was successful.");
+              log.debug("PeerSync stage of recovery was successful.");
 
               // solrcloud_debug
               // cloudDebugLog(core, "synced");
 
-              log.info("Replaying updates buffered during PeerSync.");
+              log.debug("Replaying updates buffered during PeerSync.");
               replay(core);
 
               // sync success
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 261264c..353e857 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -77,7 +77,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
             // We do this by using a multi and ensuring the parent znode of the leader registration node
             // matches the version we expect - there is a setData call that increments the parent's znode
             // version whenever a leader registers.
-            log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
+            log.debug("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
             List<Op> ops = new ArrayList<>(3);
             ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
             ops.add(Op.delete(leaderSeqPath, -1));
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 0a9c52c1..8491334 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -79,7 +79,6 @@ public class StatePublisher implements Closeable {
 
     @Override
     public void run() {
-      ActionThrottle throttle = new ActionThrottle("StatePublisherWorker", 0);
 
       while (!terminated && !zkStateReader.getZkClient().isClosed()) {
         if (!zkStateReader.getZkClient().isConnected()) {
@@ -92,8 +91,7 @@ public class StatePublisher implements Closeable {
           }
           continue;
         }
-        throttle.minimumWaitBetweenActions();
-        throttle.markAttemptingAction();
+
         ZkNodeProps message = null;
         ZkNodeProps bulkMessage = new ZkNodeProps();
         bulkMessage.getProperties().put(OPERATION, "state");
@@ -101,33 +99,39 @@ public class StatePublisher implements Closeable {
           try {
             message = workQueue.poll(1000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
-            ParWork.propagateInterrupt(e, true);
-            return;
+
           }
           if (message != null) {
-            if (log.isDebugEnabled()) log.debug("Got state message " + message);
+            log.debug("Got state message " + message);
             if (message == TERMINATE_OP) {
+              log.debug("State publish is terminated");
               terminated = true;
-              message = null;
+              return;
             } else {
               bulkMessage(message, bulkMessage);
             }
 
-            while (message != null && !terminated) {
+            while (!terminated) {
               try {
-                message = workQueue.poll(5, TimeUnit.MILLISECONDS);
+                message = workQueue.poll(100, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
-
+                log.warn("state publisher interrupted", e);
+                return;
               }
-              if (log.isDebugEnabled()) log.debug("Got state message " + message);
               if (message != null) {
+                if (log.isDebugEnabled()) log.debug("Got state message " + message);
                 if (message == TERMINATE_OP) {
                   terminated = true;
                 } else {
                   bulkMessage(message, bulkMessage);
                 }
+              } else {
+                break;
               }
             }
+          }
+
+          if (bulkMessage.getProperties().size() > 1) {
             processMessage(bulkMessage);
           }
 
@@ -141,25 +145,28 @@ public class StatePublisher implements Closeable {
       }
     }
 
-    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
+    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
+      if (log.isDebugEnabled()) log.debug("Bulk state zkNodeProps={} bulkMessage={}", zkNodeProps, bulkMessage);
       if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
         String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+        //clearStatesForNode(bulkMessage, nodeName);
         bulkMessage.getProperties().put(OverseerAction.DOWNNODE.toLower(), nodeName);
+        log.debug("bulk state publish down node, props={} result={}", zkNodeProps, bulkMessage);
 
-        clearStatesForNode(bulkMessage, nodeName);
       } else if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.RECOVERYNODE) {
+        log.debug("bulk state publish recovery node, props={} result={}", zkNodeProps, bulkMessage);
         String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+       // clearStatesForNode(bulkMessage, nodeName);
         bulkMessage.getProperties().put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
-
-        clearStatesForNode(bulkMessage, nodeName);
+        log.debug("bulk state publish recovery node, props={} result={}" , zkNodeProps, bulkMessage);
       } else {
-        String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+        //String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
         String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
         String id = zkNodeProps.getStr("id");
         String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
 
         String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
-        if (log.isDebugEnabled()) log.debug("Bulk publish core={} id={} line={}", core, id, line);
+        if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
         bulkMessage.getProperties().put(id, line);
       }
     }
@@ -188,11 +195,8 @@ public class StatePublisher implements Closeable {
     }
 
     private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
-      if (message.getProperties().size() <= 1) {
-        return;
-      }
+      log.debug("Send state updates to Overseer {}", message);
       byte[] updates = Utils.toJSON(message);
-      if (log.isDebugEnabled()) log.debug("Send state updates to Overseer {}", message);
       overseerJobQueue.offer(updates);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5d85691..680e57c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -325,8 +325,7 @@ public class ZkController implements Closeable, Runnable {
           try {
             zkController.register(descriptor.getName(), descriptor, afterExpiration);
           } catch (Exception e) {
-            log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration);
-            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+            log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
           }
         }
         return descriptor;
@@ -1263,7 +1262,7 @@ public class ZkController implements Closeable, Runnable {
       final String collection = cloudDesc.getCollectionName();
       final String shardId = cloudDesc.getShardId();
 
-      log.info("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+      log.debug("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
 
       DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
       if (docCollection != null) {
@@ -1274,14 +1273,9 @@ public class ZkController implements Closeable, Runnable {
         }
       }
 
-      // multiple calls of this method will left only one watcher
+      log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
 
-      getZkStateReader().registerCore(cloudDesc.getCollectionName(), coreName);
-
-
-      log.info("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
-
-      log.info("Register terms for replica {}", coreName);
+      log.debug("Register terms for replica {}", coreName);
 
       registerShardTerms(collection, cloudDesc.getShardId(), coreName);
 
@@ -1333,7 +1327,7 @@ public class ZkController implements Closeable, Runnable {
             throw new AlreadyClosedException();
           }
 
-          log.info("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
+          log.debug("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
         }
       }
 
@@ -1345,9 +1339,9 @@ public class ZkController implements Closeable, Runnable {
 
       boolean isLeader = leaderName.equals(coreName);
 
-      log.info("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
+      log.debug("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
 
-      log.info("Check if we should recover isLeader={}", isLeader);
+      log.debug("Check if we should recover isLeader={}", isLeader);
       //assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
 
       // recover from local transaction log and wait for it to complete before
@@ -1415,7 +1409,7 @@ public class ZkController implements Closeable, Runnable {
       // MRM TODO:
      // registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
 
-      log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+      log.debug("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
 
       desc.getCloudDescriptor().setHasRegistered(true);
 
@@ -1581,7 +1575,7 @@ public class ZkController implements Closeable, Runnable {
    */
   public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
     MDCLoggingContext.setCoreName(cd.getName());
-    log.info("publishing state={}", state);
+    log.debug("publishing state={}", state);
     String collection = cd.getCloudDescriptor().getCollectionName();
     String shardId = cd.getCloudDescriptor().getShardId();
     Map<String,Object> props = new HashMap<>();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index 774e77a..f99fc02 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -105,7 +105,7 @@ public class Assign {
     int cnt = overseer.getZkStateWriter().getReplicaAssignCnt(collection.getName(), shard);
 
     String corename = String.format(Locale.ROOT, "%s%s", namePrefix, cnt);
-    log.info("Assigned SolrCore name={} id={}", corename, cnt);
+    log.debug("Assigned SolrCore name={} id={}", corename, cnt);
     ReplicaName replicaName = new ReplicaName();
     replicaName.coreName = corename;
     replicaName.id = cnt;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
index e498d4a..1a5c92f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
@@ -111,7 +111,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
       ShardRequestTracker shardRequestTracker, @SuppressWarnings({"rawtypes"})NamedList results)
       throws IOException, InterruptedException, KeeperException {
 
-    log.info("addReplica() : {}", Utils.toJSONString(message));
+    log.debug("addReplica() : {}", Utils.toJSONString(message));
 
     String extCollectionName = message.getStr(COLLECTION_PROP);
     boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
@@ -194,7 +194,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
 
       ModifiableSolrParams params = getReplicaParams(collection, message, results, skipCreateReplicaInClusterState, shardHandler, createReplica);
 
-      log.info("create replica {} params={}", createReplica, params);
+      log.debug("create replica {} params={}", createReplica, params);
       if (!onlyUpdateState) {
         shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
       }
@@ -220,7 +220,6 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
           public Response call() {
             if (!onlyUpdateState && createdShardHandler) {
               try {
-                 log.info("Processs responses");
                 shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
               } catch (Exception e) {
                 ParWork.propagateInterrupt(e);
@@ -357,7 +356,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
   public static CreateReplica assignReplicaDetails(DocCollection coll,
                                                  ZkNodeProps message, ReplicaPosition replicaPosition, Overseer overseer) {
 
-    log.info("assignReplicaDetails {} {}", message, replicaPosition);
+    log.debug("assignReplicaDetails {} {}", message, replicaPosition);
 
     boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 0ba35e8..be522c6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -279,7 +279,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
             ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(), ZkStateReader.NUM_SHARDS_PROP, message.getStr(ZkStateReader.NUM_SHARDS_PROP), "shards", message.getStr("shards"),
             CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
         props.getProperties().putAll(addReplicaProps.getProperties());
-        if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
+        log.debug("Sending state update to populate clusterstate with new replica {}", props);
 
         clusterState = new CollectionCmdResponse(ocmh, true).call(clusterState, props, results).clusterState;
         // log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
@@ -319,8 +319,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         coresToCreate.put(coreName, sreq);
       }
 
-      Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
-      future.get();
+      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
+
       writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
 
       if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 91851f9..64c4377 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -183,7 +183,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       //  MRM TODO: - put this in finalizer and finalizer after all calls to allow parallel and forward momentum
       try {
-        overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false).get();
+        overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false);
       } catch (Exception e) {
         log.error("failure", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 29ea39c..9cb3980 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -197,7 +196,7 @@ public class DeleteReplicaCmd implements Cmd {
 
           if (waitForFinalState) {
             try {
-              ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
+              ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false);
               ocmh.overseer.writePendingUpdates(finalCollectionName1);
               waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
             } catch (Exception e) {
@@ -251,8 +250,8 @@ public class DeleteReplicaCmd implements Cmd {
         shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
       }
     }
-    List<OverseerCollectionMessageHandler.Finalize> finalizers = new ArrayList<>();
-    List<Future> futures = new ArrayList<>();
+    List<CollectionCmdResponse.Response> finalizers = new ArrayList<>();
+
     for (Map.Entry<Slice,Set<String>> entry : shardToReplicasMapping.entrySet()) {
       Slice shardSlice = entry.getKey();
       String shardId = shardSlice.getName();
@@ -265,26 +264,16 @@ public class DeleteReplicaCmd implements Cmd {
         clusterState = resp.clusterState;
         if (clusterState != null) {
           try {
-            futures.add(ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false));
+            ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
           } catch (Exception e) {
             log.error("failed sending update to zkstatewriter", e);
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
           }
         }
-        if (resp.asyncFinalRunner != null) {
-          finalizers.add(resp.asyncFinalRunner);
-        }
-      }
 
-      try {
-        for (Future future : futures) {
-          future.get();
-        }
-        ocmh.overseer.writePendingUpdates(collectionName);
-      } catch (Exception e) {
-        log.error("failed writing update to zkstatewriter", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        finalizers.add(resp);
       }
+
       results.add("shard_id", shardId);
       results.add("replicas_deleted", replicas);
     }
@@ -294,8 +283,8 @@ public class DeleteReplicaCmd implements Cmd {
     response.asyncFinalRunner = () -> {
       CollectionCmdResponse.Response resp = new CollectionCmdResponse.Response();
       resp.asyncFinalRunner = () -> {
-        for (OverseerCollectionMessageHandler.Finalize finalize : finalizers) {
-          finalize.call();
+        for (CollectionCmdResponse.Response finalize : finalizers) {
+          finalize.asyncFinalRunner.call();
         }
         return new CollectionCmdResponse.Response();
       };
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 5fbb7f9..a7f00bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -177,7 +177,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
       if (waitForFinalState) {
-        ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
+        ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false);
         ocmh.overseer.writePendingUpdates(collectionName).get();
         ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
           if (coll == null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index e9f6aaa..5fc01e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -278,7 +278,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
 
     CollectionCmdResponse.Response response = ocmh.addReplicaWithResp(clusterState, addReplicasProps, addResult);
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false).get();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false);
 
     // wait for the other replica to be active if the source replica was a leader
 
@@ -309,7 +309,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         try {
           response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
           String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
-          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false);
           asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
         } catch (SolrException e) {
           deleteResult.add("failure", e.toString());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index faa10df..c6f98da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -320,19 +320,18 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
 
         CollectionCmdResponse.Response asyncResp = null;
-        Future future = null;
+
         if (responce.clusterState != null) {
           DocCollection docColl = responce.clusterState.getCollectionOrNull(collection);
 
           if (docColl != null) {
 
-            future = zkWriter.enqueueUpdate(docColl, null, false);
+            zkWriter.enqueueUpdate(docColl, null, false);
 
             if (responce != null && responce.asyncFinalRunner != null) {
               asyncResp = responce.asyncFinalRunner.call();
             }
             if (asyncResp == null || asyncResp.writeFuture == null) {
-              future.get();
               writeFuture2 = overseer.writePendingUpdates(collection);
             }
 
@@ -349,13 +348,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", asyncResp.clusterState);
           if (asyncResp.clusterState != null) {
             DocCollection docColl = asyncResp.clusterState.getCollectionOrNull(collection);
-
             if (docColl != null) {
-
-              zkWriter.enqueueUpdate(docColl, null, false).get();
-              if (future != null) {
-                future.get();
-              }
+              zkWriter.enqueueUpdate(docColl, null, false);
               writeFuture = overseer.writePendingUpdates(collection);
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 2b0fdec..3a55cd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -100,9 +100,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     for (ZkNodeProps sourceReplica : sourceReplicas) {
       @SuppressWarnings({"rawtypes"}) NamedList nl = new NamedList();
       String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
-      }
+
+      log.debug("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+
       String targetNode = target;
       if (targetNode == null) {
         Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
@@ -146,7 +146,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       runners.add(runner);
     }
     String collection = clusterState.getCollectionStates().keySet().iterator().next();
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false);
     ocmh.overseer.writePendingUpdates(collection);
 
     CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 793ac18..b19b3fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -349,7 +349,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 //        firstReplicaFutures.add(future);
       }
 
-      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
+      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false);
       ocmh.overseer.writePendingUpdates(collectionName);
 
       log.info("Clusterstate after adding new shard for split {}", clusterState);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index a16683a..cd80b5d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Matcher;
 
-import org.apache.solr.cloud.ActionThrottle;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.cloud.api.collections.Assign;
@@ -44,7 +43,6 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -66,12 +64,8 @@ public class ZkStateWriter {
 
   protected volatile Stats stats;
 
-  private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>(128, 0.75f, 16);
-
   private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
 
-  Map<String,DocCollection> failedUpdates = new ConcurrentHashMap<>();
-
   Map<Long,String> idToCollection = new ConcurrentHashMap<>(128, 0.75f, 16);
 
   private Map<String,DocAssign> assignMap = new ConcurrentHashMap<>(128, 0.75f, 16);
@@ -82,10 +76,8 @@ public class ZkStateWriter {
 
   private static class ColState {
     ReentrantLock collLock = new ReentrantLock(true);
-    ActionThrottle throttle = new ActionThrottle("ZkStateWriter", Integer.getInteger("solr.zkstatewriter.throttle", 0), new TimeSource.NanoTimeSource());
   }
 
-
   private AtomicLong ID = new AtomicLong();
 
   private Set<String> dirtyStructure = ConcurrentHashMap.newKeySet();
@@ -98,227 +90,206 @@ public class ZkStateWriter {
 
   }
 
-  public Future enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
-    return ParWork.getRootSharedExecutor().submit(() -> {
-
-      try {
-        if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={} docCollection={} cs={}", stateUpdate, docCollection, cs);
-        if (!stateUpdate) {
-
-          String collectionName = docCollection.getName();
-
-          ColState collState = collLocks.compute(collectionName, (s, colState) -> {
-            if (colState == null) {
-              ColState cState = new ColState();
-              return cState;
-            }
-            return colState;
-          });
-          collState.collLock.lock();
-
-          try {
+  public void enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
 
-            DocCollection currentCollection = cs.get(docCollection.getName());
-            log.debug("zkwriter collection={}", docCollection);
-            log.debug("zkwriter currentCollection={}", currentCollection);
-
-            idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+    try {
 
-            //          if (currentCollection != null) {
-            //            if (currentCollection.getId() != collection.getId()) {
-            //              removeCollection(collection.getName());
-            //            }
-            //          }
+      if (!stateUpdate) {
+        if (log.isDebugEnabled()) log.debug("enqueue structure change docCollection={}", docCollection);
 
-            if (currentCollection != null) {
+        String collectionName = docCollection.getName();
+        ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+          if (colState == null) {
+            ColState cState = new ColState();
+            return cState;
+          }
+          return colState;
+        });
+        collState.collLock.lock();
+        try {
 
-              currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
-              List<String> removeSlices = new ArrayList();
-              for (Slice slice : docCollection) {
-                Slice currentSlice = currentCollection.getSlice(slice.getName());
-                if (currentSlice != null) {
-                  if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
-                    removeSlices.add(slice.getName());
-                  } else {
-                    currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
-                  }
+          DocCollection currentCollection = cs.get(docCollection.getName());
+          log.debug("zkwriter collection={}", docCollection);
+          log.debug("zkwriter currentCollection={}", currentCollection);
+          dirtyStructure.add(docCollection.getName());
+          idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+
+          if (currentCollection != null) {
+            docCollection.setZnodeVersion(currentCollection.getZNodeVersion());
+            currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
+            List<String> removeSlices = new ArrayList();
+            for (Slice slice : docCollection) {
+              Slice currentSlice = currentCollection.getSlice(slice.getName());
+              if (currentSlice != null) {
+                if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
+                  removeSlices.add(slice.getName());
                 } else {
-                  if (slice.getProperties().get("remove") != null) {
-                    continue;
-                  }
-                  Set<String> remove = new HashSet<>();
+                  currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
+                }
+              } else {
+                if (slice.getProperties().get("remove") != null) {
+                  continue;
+                }
+                Set<String> remove = new HashSet<>();
 
-                  for (Replica replica : slice) {
+                for (Replica replica : slice) {
 
-                    if (replica.get("remove") != null) {
-                      remove.add(replica.getName());
-                    }
-                  }
-                  for (String removeReplica : remove) {
-                    slice.getReplicasMap().remove(removeReplica);
+                  if (replica.get("remove") != null) {
+                    remove.add(replica.getName());
                   }
-                  currentCollection.getSlicesMap().put(slice.getName(), slice);
                 }
-              }
-              for (String removeSlice : removeSlices) {
-                currentCollection.getSlicesMap().remove(removeSlice);
-              }
-              cs.put(currentCollection.getName(), currentCollection);
-
-            } else {
-              docCollection.getProperties().remove("pullReplicas");
-              docCollection.getProperties().remove("replicationFactor");
-              docCollection.getProperties().remove("maxShardsPerNode");
-              docCollection.getProperties().remove("nrtReplicas");
-              docCollection.getProperties().remove("tlogReplicas");
-              List<String> removeSlices = new ArrayList();
-              for (Slice slice : docCollection) {
-                Slice currentSlice = docCollection.getSlice(slice.getName());
-                if (currentSlice != null) {
-                  if (slice.getProperties().get("remove") != null) {
-                    removeSlices.add(slice.getName());
-                  }
+                for (String removeReplica : remove) {
+                  slice.getReplicasMap().remove(removeReplica);
                 }
+                currentCollection.getSlicesMap().put(slice.getName(), slice);
               }
-              for (String removeSlice : removeSlices) {
-                docCollection.getSlicesMap().remove(removeSlice);
+            }
+            for (String removeSlice : removeSlices) {
+              currentCollection.getSlicesMap().remove(removeSlice);
+            }
+            cs.put(currentCollection.getName(), currentCollection);
+
+          } else {
+            docCollection.getProperties().remove("pullReplicas");
+            docCollection.getProperties().remove("replicationFactor");
+            docCollection.getProperties().remove("maxShardsPerNode");
+            docCollection.getProperties().remove("nrtReplicas");
+            docCollection.getProperties().remove("tlogReplicas");
+            List<String> removeSlices = new ArrayList();
+            for (Slice slice : docCollection) {
+              Slice currentSlice = docCollection.getSlice(slice.getName());
+              if (currentSlice != null) {
+                if (slice.getProperties().get("remove") != null) {
+                  removeSlices.add(slice.getName());
+                }
               }
-
-              cs.put(docCollection.getName(), docCollection);
+            }
+            for (String removeSlice : removeSlices) {
+              docCollection.getSlicesMap().remove(removeSlice);
             }
 
-            dirtyStructure.add(collectionName);
-
-          } finally {
-            collState.collLock.unlock();
+            cs.put(docCollection.getName(), docCollection);
           }
-        } else {
 
-          for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
-
-            ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
-              if (reentrantLock == null) {
-                ColState colState = new ColState();
-                return colState;
-              }
-              return reentrantLock;
-            });
+        } finally {
+          collState.collLock.unlock();
+        }
+      } else {
+        if (log.isDebugEnabled()) log.debug("enqueue state change states={}", collStateUpdates);
+        for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+
+          ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
+            if (reentrantLock == null) {
+              ColState colState = new ColState();
+              return colState;
+            }
+            return reentrantLock;
+          });
 
-            collState.collLock.lock();
-            try {
-              String collectionId = entry.getKey();
-              String collection = idToCollection.get(Long.parseLong(collectionId));
-              if (collection == null) {
-                log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
-                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
-              }
+          collState.collLock.lock();
+          try {
+            String collectionId = entry.getKey();
+            String collection = idToCollection.get(Long.parseLong(collectionId));
+            if (collection == null) {
+              log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
+            }
 
-              ZkNodeProps updates = stateUpdates.get(collection);
-              if (updates == null) {
-                updates = new ZkNodeProps();
-                stateUpdates.put(collection, updates);
-              }
+            ZkNodeProps updates = stateUpdates.get(collection);
+            if (updates == null) {
+              updates = new ZkNodeProps();
+              stateUpdates.put(collection, updates);
+            }
 
+            DocCollection docColl = cs.get(collection);
+            String csVersion;
+            if (docColl != null) {
+              csVersion = Integer.toString(docColl.getZNodeVersion());
               for (StateUpdate state : entry.getValue()) {
-
-                Integer ver = trackVersions.get(collection);
-                if (ver == null) {
-                  ver = 0;
+                if (state.sliceState != null) {
+                  Slice slice = docColl.getSlice(state.sliceName);
+                  if (slice != null) {
+                    slice.setState(Slice.State.getState(state.sliceState));
+                  }
+                  dirtyStructure.add(collection);
+                  continue;
                 }
-                updates.getProperties().put("_cs_ver_", ver.toString());
-
-                log.debug("version for state updates {}", ver.toString());
 
-                DocCollection docColl = cs.get(collection);
-                if (docColl != null) {
-
-                  if (state.sliceState != null) {
-                    Slice slice = docColl.getSlice(state.sliceName);
-                    if (slice != null) {
-                      slice.setState(Slice.State.getState(state.sliceState));
+                Replica replica = docColl.getReplicaById(state.id);
+                log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+                if (replica != null) {
+                  String setState = Replica.State.shortStateToState(state.state).toString();
+                  log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
+                  if (setState.equals("leader")) {
+                    if (log.isDebugEnabled()) {
+                      log.debug("set leader {}", replica);
                     }
-                    dirtyStructure.add(docColl.getName());
-                    continue;
-                  }
-
-                  Replica replica = docColl.getReplicaById(state.id);
-                  log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
-                  if (replica != null) {
-                    String setState = Replica.State.shortStateToState(state.state).toString();
-                    //                        if (blockedNodes.contains(replica.getNodeName())) {
-                    //                          continue;
-                    //                        }
-                    log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
-                    if (setState.equals("leader")) {
-                      if (log.isDebugEnabled()) {
-                        log.debug("set leader {}", replica);
-                      }
-                      Slice slice = docColl.getSlice(replica.getSlice());
-                      slice.setLeader(replica);
-                      replica.setState(Replica.State.ACTIVE);
-                      replica.getProperties().put("leader", "true");
-                      Collection<Replica> replicas = slice.getReplicas();
-                      for (Replica r : replicas) {
-                        if (r != replica) {
-                          r.getProperties().remove("leader");
-                        }
-                      }
-                      updates.getProperties().put(replica.getInternalId(), "l");
-                      dirtyState.add(collection);
-                    } else {
-                      Replica.State s = Replica.State.getState(setState);
-                      Replica existingLeader = docColl.getSlice(replica).getLeader();
-                      if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
-                        docColl.getSlice(replica).setLeader(null);
+                    Slice slice = docColl.getSlice(replica.getSlice());
+                    slice.setLeader(replica);
+                    replica.setState(Replica.State.ACTIVE);
+                    replica.getProperties().put("leader", "true");
+                    Collection<Replica> replicas = slice.getReplicas();
+                    for (Replica r : replicas) {
+                      if (r != replica) {
+                        r.getProperties().remove("leader");
                       }
-                      updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
-                      log.debug("set state {} {}", state, replica);
-                      replica.setState(s);
-                      dirtyState.add(collection);
                     }
-                  } else {
-                    log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
-                  }
-                } else {
-                  log.debug("Could not find existing collection name={}", collection);
-                  String setState = Replica.State.shortStateToState(state.state).toString();
-                  if (setState.equals("leader")) {
-                    updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+                    updates.getProperties().put(replica.getInternalId(), "l");
                     dirtyState.add(collection);
                   } else {
                     Replica.State s = Replica.State.getState(setState);
-                    updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+                    Replica existingLeader = docColl.getSlice(replica).getLeader();
+                    if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
+                      docColl.getSlice(replica).setLeader(null);
+                    }
+                    updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
+                    log.debug("set state {} {}", state, replica);
+                    replica.setState(s);
                     dirtyState.add(collection);
                   }
+                } else {
+                  log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
                 }
               }
-
-              String coll = entry.getKey();
-              dirtyState.add(coll);
-              Integer ver = trackVersions.get(coll);
-              if (ver == null) {
-                ver = 0;
-              }
-              updates.getProperties().put("_cs_ver_", ver.toString());
-              for (StateUpdate theUpdate : entry.getValue()) {
-                updates.getProperties().put(theUpdate.id.substring(theUpdate.id.indexOf("-") + 1), theUpdate.state);
+            } else {
+              for (StateUpdate state : entry.getValue()) {
+                log.warn("Could not find existing collection name={}", collection);
+//                String setState = Replica.State.shortStateToState(state.state).toString();
+//                if (setState.equals("leader")) {
+//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+//                  dirtyState.add(collection);
+//                } else {
+//                  Replica.State s = Replica.State.getState(setState);
+//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+//                  dirtyState.add(collection);
+//                }
               }
+              log.debug("version for state updates 0");
+              csVersion = "0";
+            }
 
-            } finally {
-              collState.collLock.unlock();
+            if (dirtyState.contains(collection)) {
+              updates.getProperties().put("_cs_ver_", csVersion);
             }
+
+          } finally {
+            collState.collLock.unlock();
           }
         }
-        
-      } catch (Exception e) {
-        log.error("Exception while queuing update", e);
-        throw e;
       }
-    });
+
+    } catch (Exception e) {
+      log.error("Exception while queuing update", e);
+      throw e;
+    }
   }
 
   public Integer lastWrittenVersion(String collection) {
-    return trackVersions.get(collection);
+    DocCollection col = cs.get(collection);
+    if (col == null) {
+      return 0;
+    }
+    return col.getZNodeVersion();
   }
 
   /**
@@ -326,31 +297,29 @@ public class ZkStateWriter {
    *
    */
 
-  // if additional updates too large, publish structure change
-  public void writePendingUpdates(String collection) {
-
-    do {
-      try {
-        write(collection);
-        break;
-      } catch (KeeperException.BadVersionException e) {
-
-      } catch (Exception e) {
-        log.error("write pending failed", e);
-        break;
-      }
+  public Future writePendingUpdates(String collection) {
+    return ParWork.getRootSharedExecutor().submit(() -> {
+      do {
+        try {
+          write(collection);
+          break;
+        } catch (KeeperException.BadVersionException e) {
 
-    } while (!overseer.isClosed());
+        } catch (Exception e) {
+          log.error("write pending failed", e);
+          break;
+        }
 
+      } while (!overseer.isClosed() && !overseer.getZkStateReader().getZkClient().isClosed());
+    });
   }
 
   private void write(String coll) throws KeeperException.BadVersionException {
 
     if (log.isDebugEnabled()) {
-      log.debug("writePendingUpdates {}", cs);
+      log.debug("writePendingUpdates {}", coll);
     }
 
-    AtomicInteger lastVersion = new AtomicInteger();
     AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
 
     DocCollection collection = cs.get(coll);
@@ -359,133 +328,104 @@ public class ZkStateWriter {
       return;
     }
 
-    if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
-    Integer version = null;
-    if (dirtyStructure.contains(collection.getName())) {
-      log.info("process collection {}", collection);
-      ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
-        if (reentrantLock == null) {
-          ColState colState = new ColState();
-          return colState;
-        }
-        return reentrantLock;
-      });
-
-      collState.collLock.lock();
-      try {
-        collState.throttle.minimumWaitBetweenActions();
-        collState.throttle.markAttemptingAction();
-        String name = collection.getName();
-        String path = ZkStateReader.getCollectionPath(collection.getName());
-        String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
-        // log.info("process collection {} path {}", collection.getName(), path);
-        Stat existsStat = null;
-        if (log.isTraceEnabled()) log.trace("process {}", collection);
-        try {
-          // log.info("get data for {}", name);
-
-          //  log.info("got data for {} {}", name, data.length);
+    log.debug("process collection {}", coll);
+    ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
+      if (reentrantLock == null) {
+        ColState colState = new ColState();
+        return colState;
+      }
+      return reentrantLock;
+    });
+    collState.collLock.lock();
+    try {
+      collection = cs.get(coll);
 
-          try {
+      if (collection == null) {
+        return;
+      }
 
-            if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
-            byte[] data = Utils.toJSON(singletonMap(name, collection));
-            Integer v = trackVersions.get(collection.getName());
+      if (log.isTraceEnabled()) log.trace("check collection {} {} {}", collection, dirtyStructure, dirtyState);
 
-            if (v != null) {
-              //log.info("got version from cache {}", v);
-              version = v;
-            } else {
-              version = 0;
-            }
-            lastVersion.set(version);
-            if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+      //  collState.throttle.minimumWaitBetweenActions();
+      //  collState.throttle.markAttemptingAction();
+      String name = collection.getName();
+      String path = ZkStateReader.getCollectionPath(collection.getName());
+      String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
 
-            reader.getZkClient().setData(path, data, version, true, false);
-            if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+      if (log.isTraceEnabled()) log.trace("process {}", collection);
+      try {
 
-            reader.getZkClient().setData(pathSCN, null, -1, true, false);
+        if (dirtyStructure.contains(name)) {
+          if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
 
-            dirtyStructure.remove(collection.getName());
+          byte[] data = Utils.toJSON(singletonMap(name, collection));
 
-            ZkNodeProps updates = stateUpdates.get(collection.getName());
-            if (updates != null) {
-              updates.getProperties().clear();
-              //(collection, updates);
-              //dirtyState.remove(collection.getName());
-            }
+          if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
 
-            trackVersions.put(collection.getName(), version + 1);
+          Integer finalVersion = collection.getZNodeVersion();
+          dirtyStructure.remove(collection.getName());
+          if (reader == null) {
+            log.error("read not initialized in zkstatewriter");
+          }
+          if (reader.getZkClient() == null) {
+            log.error("zkclient not initialized in zkstatewriter");
+          }
 
+          Stat stat;
+          try {
+            stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
+            collection.setZnodeVersion(finalVersion + 1);
+            if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
           } catch (KeeperException.NoNodeException e) {
-            if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
-            lastVersion.set(-1);
-            trackVersions.remove(collection.getName());
-            stateUpdates.remove(collection.getName());
-            cs.remove(collection);
-            // likely deleted
+            log.debug("No node found for state.json", e);
 
           } catch (KeeperException.BadVersionException bve) {
-            log.info("Tried to update state.json ({}) with bad version", collection);
-            //lastFailedException.set(bve);
-            //failedUpdates.put(collection.getName(), collection);
-            // Stat estate = reader.getZkClient().exists(path, null);
-            trackVersions.remove(collection.getName());
-            Stat stat = reader.getZkClient().exists(path, null, false, false);
-            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
-            if (!overseer.isClosed() && stat != null) {
-              trackVersions.put(collection.getName(), stat.getVersion());
-            }
+            stat = reader.getZkClient().exists(path, null, false, false);
+            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, finalVersion, stat != null ? stat.getVersion() : "null");
+
             throw bve;
           }
 
-        } catch (KeeperException.BadVersionException bve) {
-          badVersionException.set(bve);
-        } catch (InterruptedException | AlreadyClosedException e) {
-          log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+          reader.getZkClient().setData(pathSCN, null, -1, true, false);
 
-        } catch (Exception e) {
-          log.error("Failed processing update=" + collection, e);
-        }
-      } finally {
-        collState.collLock.unlock();
-      }
-    }
+          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          if (updates != null) {
+            updates.getProperties().clear();
+          }
 
-    if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
-      ZkNodeProps updates = stateUpdates.get(collection.getName());
-      if (updates != null) {
-        try {
-          writeStateUpdates(collection, updates);
-        } catch (Exception e) {
-          log.error("exception writing state updates", e);
+        } else if (dirtyState.contains(collection.getName())) {
+          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          if (updates != null) {
+            try {
+              writeStateUpdates(collection, updates);
+            } catch (Exception e) {
+              log.error("exception writing state updates", e);
+            }
+          }
         }
-      }
-    }
 
-    //removeCollections.forEach(c ->  removeCollection(c));
+      } catch (KeeperException.BadVersionException bve) {
+        badVersionException.set(bve);
+      } catch (InterruptedException | AlreadyClosedException e) {
+        log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+        throw new AlreadyClosedException(e);
 
-    if (badVersionException.get() != null) {
-      throw badVersionException.get();
-    }
-
-    //log.info("Done with successful cluster write out");
+      } catch (Exception e) {
+        log.error("Failed processing update=" + collection, e);
+      }
 
-    //    } finally {
-    //      writeLock.unlock();
-    //    }
-    // MRM TODO: - harden against failures and exceptions
+      if (badVersionException.get() != null) {
+        throw badVersionException.get();
+      }
 
-    //    if (log.isDebugEnabled()) {
-    //      log.debug("writePendingUpdates() - end - New Cluster State is: {}", newClusterState);
-    //    }
+    } finally {
+      collState.collLock.unlock();
+    }
   }
 
   private void writeStateUpdates(DocCollection collection, ZkNodeProps updates) throws KeeperException, InterruptedException {
     String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
-    if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
+    if (log.isDebugEnabled()) log.debug("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
     try {
       reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
     } catch (KeeperException.NoNodeException e) {
@@ -519,14 +459,6 @@ public class ZkStateWriter {
     });
     collState.collLock.lock();
     try {
-      stateUpdates.remove(collection);
-      cs.remove(collection);
-      assignMap.remove(collection);
-      trackVersions.remove(collection);
-      dirtyStructure.remove(collection);
-      dirtyState.remove(collection);
-      ZkNodeProps message = new ZkNodeProps("name", collection);
-      cs.remove(collection);
       Long id = null;
       for (Map.Entry<Long, String> entry : idToCollection.entrySet()) {
         if (entry.getValue().equals(collection)) {
@@ -536,6 +468,12 @@ public class ZkStateWriter {
       }
       if (id != null) {
         idToCollection.remove(id);
+        stateUpdates.remove(collection);
+        cs.remove(collection);
+        assignMap.remove(collection);
+        dirtyStructure.remove(collection);
+        dirtyState.remove(collection);
+        cs.remove(collection);
       }
     } catch (Exception e) {
       log.error("", e);
@@ -557,17 +495,17 @@ public class ZkStateWriter {
 
 
       int id = docAssign.replicaAssignCnt.incrementAndGet();
-      log.info("assign id={} for collection={} slice={}", id, collection, shard);
+      log.debug("assign id={} for collection={} slice={}", id, collection, shard);
       return id;
     }
 
     int id = docAssign.replicaAssignCnt.incrementAndGet();
-    log.info("assign id={} for collection={} slice={}", id, collection, shard);
+    log.debug("assign id={} for collection={} slice={}", id, collection, shard);
     return id;
   }
 
   public void init() {
-    reader.forciblyRefreshAllClusterStateSlow();
+
     ClusterState readerState = reader.getClusterState();
     if (readerState != null) {
       cs.putAll(readerState.copy().getCollectionsMap());
@@ -575,31 +513,44 @@ public class ZkStateWriter {
 
     long[] highId = new long[1];
     cs.values().forEach(collection -> {
-      if (collection.getId() > highId[0]) {
-        highId[0] = collection.getId();
-      }
+      String collectionName = collection.getName();
+      ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+        if (colState == null) {
+          ColState cState = new ColState();
+          return cState;
+        }
+        return colState;
+      });
+      collState.collLock.lock();
+      try {
 
-      idToCollection.put(collection.getId(), collection.getName());
+        if (collection.getId() > highId[0]) {
+          highId[0] = collection.getId();
+        }
 
-      trackVersions.put(collection.getName(), collection.getZNodeVersion());
+        idToCollection.put(collection.getId(), collection.getName());
 
-      DocAssign docAssign = new DocAssign();
-      docAssign.name = collection.getName();
-      assignMap.put(docAssign.name, docAssign);
-      int max = 1;
-      Collection<Slice> slices = collection.getSlices();
-      for (Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-
-        for (Replica replica : replicas) {
-          Matcher matcher = Assign.pattern.matcher(replica.getName());
-          if (matcher.matches()) {
-            int val = Integer.parseInt(matcher.group(1));
-            max = Math.max(max, val);
+
+        DocAssign docAssign = new DocAssign();
+        docAssign.name = collection.getName();
+        assignMap.put(docAssign.name, docAssign);
+        int max = 1;
+        Collection<Slice> slices = collection.getSlices();
+        for (Slice slice : slices) {
+          Collection<Replica> replicas = slice.getReplicas();
+
+          for (Replica replica : replicas) {
+            Matcher matcher = Assign.pattern.matcher(replica.getName());
+            if (matcher.matches()) {
+              int val = Integer.parseInt(matcher.group(1));
+              max = Math.max(max, val);
+            }
           }
         }
+        docAssign.replicaAssignCnt.set(max);
+      } finally {
+        collState.collLock.unlock();
       }
-      docAssign.replicaAssignCnt.set(max);
     });
 
     ID.set(highId[0]);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5cc13bb..6775b88 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -156,6 +156,7 @@ public class CoreContainer implements Closeable {
   final SolrCores solrCores = new SolrCores(this);
   private volatile boolean startedLoadingCores;
   private volatile boolean loaded;
+  private volatile SolrClient metricsHistoryHandlerCloudClient;
 
   public static class CoreLoadFailure {
 
@@ -877,7 +878,7 @@ public class CoreContainer implements Closeable {
 
             List<Replica> replicas = c.getReplicas();
             for (Replica replica : replicas) {
-              log.debug("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
+              log.trace("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
               if (replica.getNodeName().equals(nodeName)) {
                 if (replica.getState().equals(State.ACTIVE)) {
                   if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={} replica={}", replica.getState(), replica.getNodeName(), nodeName, replica);
@@ -998,20 +999,20 @@ public class CoreContainer implements Closeable {
     }
     String name;
     SolrCloudManager cloudManager;
-    SolrClient client;
+
     if (isZooKeeperAware()) {
       name = getZkController().getNodeName();
       cloudManager = getZkController().getSolrCloudManager();
-      client = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
+      metricsHistoryHandlerCloudClient = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
           .withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().build();
-      ((CloudHttp2SolrClient)client).connect();
+      ((CloudHttp2SolrClient) metricsHistoryHandlerCloudClient).connect();
     } else {
       name = getNodeConfig().getNodeName();
       if (name == null || name.isEmpty()) {
         name = "127.0.0.1";
       }
       cloudManager = null;
-      client = new EmbeddedSolrServer();
+      metricsHistoryHandlerCloudClient = new EmbeddedSolrServer();
       // enable local metrics unless specifically set otherwise
       if (!initArgs.containsKey(MetricsHistoryHandler.ENABLE_NODES_PROP)) {
         initArgs.put(MetricsHistoryHandler.ENABLE_NODES_PROP, true);
@@ -1020,8 +1021,7 @@ public class CoreContainer implements Closeable {
         initArgs.put(MetricsHistoryHandler.ENABLE_REPLICAS_PROP, true);
       }
     }
-    metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler,
-        client, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
+    metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler, metricsHistoryHandlerCloudClient, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
     containerHandlers.put(METRICS_HISTORY_PATH, metricsHistoryHandler);
     metricsHistoryHandler.initializeMetrics(solrMetricsContext, METRICS_HISTORY_PATH);
   }
@@ -1169,7 +1169,7 @@ public class CoreContainer implements Closeable {
       if (auditloggerPlugin != null) {
         auditPlugin = auditloggerPlugin.plugin;
       }
-
+      closer.collect(metricsHistoryHandlerCloudClient);
       closer.collect(authPlugin);
       closer.collect(authenPlugin);
       closer.collect(auditPlugin);
@@ -1410,7 +1410,7 @@ public class CoreContainer implements Closeable {
   @SuppressWarnings("resource")
   private SolrCore createFromDescriptor(CoreDescriptor dcore, boolean newCollection) {
 
-    log.info("createFromDescriptor {} {}", dcore, newCollection);
+    log.debug("createFromDescriptor {} {}", dcore, newCollection);
 
     if (isShutDown()) {
       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
@@ -1421,10 +1421,15 @@ public class CoreContainer implements Closeable {
     boolean registered = false;
     try {
       MDCLoggingContext.setCoreName(dcore.getName());
+
       StopWatch timeValidateCoreNameLoadConfigSet = new StopWatch(dcore.getName() + "-validateCoreNameLoadConfigSet");
 
       SolrIdentifierValidator.validateCoreName(dcore.getName());
 
+      if (isZooKeeperAware()) {
+        getZkController().getZkStateReader().registerCore(dcore.getCollectionName(), dcore.getName());
+      }
+
       ConfigSet coreConfig = coreConfigService.loadConfigSet(dcore);
       dcore.setConfigSetTrusted(coreConfig.isTrusted());
       if (log.isInfoEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 17c4351..f380a0f 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1769,8 +1769,11 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       }
     }
 
-    void doClose () {
-
+    synchronized void doClose () {
+      if (refCount.get() == -1) {
+        log.warn("SolrCore is already closed {}", name);
+        return;
+      }
       try {
         if (closing) {
           this.closing = true;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6a8e1fa..5f9f90e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1077,7 +1077,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX);
       if (m.isEmpty()) {
         throw new SolrException(ErrorCode.BAD_REQUEST,
-            formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
+            formatString("no supported values provided {0} {1}", req.getParams(), CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
       }
       copy(req.getParams().required(), m, COLLECTION_PROP);
       for (Map.Entry<String, Object> entry : m.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
index 05f81bf..f4cf0b8 100644
--- a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
+++ b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
@@ -222,7 +222,7 @@ public abstract class ManagedResourceStorage {
         throw new SolrException(ErrorCode.SERVER_ERROR, errMsg, exc);
       }
       
-      log.info("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
+      log.debug("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
     }    
     
     @Override
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index d912d62..3943f82 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -101,7 +101,7 @@ public final class CommitTracker implements Runnable, Closeable {
     scheduler.setRemoveOnCancelPolicy(true);
     scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-    log.info("{} AutoCommit: {}", name, this);
+    log.debug("{} AutoCommit: {}", name, this);
     assert ObjectReleaseTracker.track(this);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index e17ef6a..a0c772f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -144,9 +144,8 @@ UpdateHandler implements SolrInfoBean, Closeable {
         ulog.clearLog(core, ulogPluginInfo);
       }
 
-      if (log.isInfoEnabled()) {
-        log.info("Using UpdateLog implementation: {}", ulog.getClass().getName());
-      }
+      log.debug("Using UpdateLog implementation: {}", ulog.getClass().getName());
+
       ulog.init(ulogPluginInfo);
       ulog.init(this, core);
     } else {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index dfb3679..b9c6461 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -391,7 +391,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
           "Number of version buckets must be greater than 0!");
 
-    log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
+    log.debug("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
         dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep, numVersionBuckets);
   }
 
@@ -2386,7 +2386,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
       if (maxVersion == 0L) {
         maxVersion = versions.getNewClock();
-        log.info("Could not find max version in index or recent updates, using new clock {}", maxVersion);
+        log.debug("Could not find max version in index or recent updates, using new clock {}", maxVersion);
       }
 
       // seed all version buckets with the highest value from recent and index
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 61a785d..4f9e619 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -277,7 +277,7 @@ public class VersionInfo {
 
     final String versionFieldName = versionField.getName();
 
-    log.info("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
+    log.debug("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
     // if indexed, then we have terms to get the max from
     if (versionField.indexed()) {
       if (versionField.getType().isPointField()) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 36b0bbf..1a2d28a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -250,7 +250,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
       } else {
         // drop it
-        log.info("drop docid={}", cmd.getPrintableId());
+        log.debug("drop docid={}", cmd.getPrintableId());
         if (SkyHook.skyHookDoc != null) {
           SkyHook.skyHookDoc.register(cmd.getPrintableId(), "final notice of dropping update, returning from processAdd cmd=" + cmd);
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 620b890..bfd55b3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
 
   @Test
   public void start() throws Exception {
-    int collectionCnt = 40;
+    int collectionCnt = 5;
     List<Future> futures = new ArrayList<>();
     List<Future> indexFutures = new ArrayList<>();
     for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 91fd066..b88e93f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -51,9 +51,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
 //@LuceneTestCase.Nightly
 public class TestCollectionAPI extends ReplicaPropertiesBase {
 
@@ -909,6 +910,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
       params.set("router.name", "implicit");
       params.set("numShards", "1");
       params.set("shards", "invalid@name#with$weird%characters");
+      params.set(WAIT_FOR_FINAL_STATE, "true");
       SolrRequest request = new QueryRequest(params);
       request.setPath("/admin/collections");
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 86c7995..b70b08d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -379,7 +379,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
       String url = zkClientClusterStateProvider.getZkStateReader().getBaseUrlForNodeName(solrNode);
 
       try {
-        GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
+        GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.POST, path, params); // params too long for GET
         try (Http2SolrClient client = new Http2SolrClient.Builder().withHttpClient(httpClient).withBaseUrl(url).markInternalRequest().build()) {
           NamedList<Object> rsp = client.request(request);
           request.response.nl = rsp;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index dc36207..d7d41d4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -215,6 +215,20 @@ public class ClusterState implements JSONWriter.Writable {
     return createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
   }
 
+  public static DocCollection createDocCollectionFromJson(Replica.NodeNameToBaseUrl nodeNameToBaseUrl, int version, byte[] bytes) {
+    if (bytes == null || bytes.length == 0) {
+      return null;
+    }
+    Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes);
+    ClusterState cs = createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
+    if (cs.getCollectionsMap().size() == 0) {
+      return null;
+    }
+    DocCollection docCollection = cs.getCollectionsMap().values().iterator().next();
+    docCollection.setZnodeVersion(version);
+    return docCollection;
+  }
+
   public static ClusterState createFromCollectionMap(Replica.NodeNameToBaseUrl zkStateReader, int version, Map<String, Object> stateMap) {
     Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 814f808..59cd767 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -93,7 +93,7 @@ public class ConnectionManager implements Watcher, Closeable {
       throw new IllegalStateException("You must call start on " + SolrZkClient.class.getName() + " before you can use it");
     }
 
-    if (keeper != null && isClosed && !keeper.getState().isAlive()) {
+    if (keeper != null && (isClosed || !keeper.getState().isAlive())) {
       throw new AlreadyClosedException(this + " SolrZkClient is not currently connected state=" + keeper.getState());
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 3c0326f..b55ce1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -51,7 +51,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
 
-  private final int znodeVersion;
+  private int znodeVersion;
 
   private final String name;
   private final Map<String, Slice> slices;
@@ -462,4 +462,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Map getStateUpdates() {
     return stateUpdates;
   }
+
+  public void setZnodeVersion(int version) {
+    this.znodeVersion = version;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 80eea86..2a48875 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -426,9 +426,11 @@ public class SolrZkClient implements Closeable {
     return setData(path, data, version, retryOnConnLoss, false);
   }
 
-  /**
-   * Returns node's state
-   */
+  public void setData(final String path, final byte data[], final int version, AsyncCallback.StatCallback cb, Object ctx)
+      throws KeeperException, InterruptedException {
+    connManager.getKeeper().setData(path, data, version, cb, ctx);
+  }
+
   public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss, boolean retryOnSessionExpiration)
       throws KeeperException, InterruptedException {
 
@@ -823,21 +825,13 @@ public class SolrZkClient implements Closeable {
     return dataMap;
   }
 
-  public void delete(Collection<String> paths, boolean wait) throws KeeperException {
+  public CountDownLatch delete(Collection<String> paths, boolean wait) throws KeeperException {
     if (log.isDebugEnabled()) log.debug("delete paths {} wait={}", paths, wait);
-    if (paths.size() == 0) {
-      return;
-    }
-    CountDownLatch latch = null;
-    if (wait) {
-      latch = new CountDownLatch(paths.size());
-    }
+    CountDownLatch latch = new CountDownLatch(paths.size());
+
     KeeperException[] ke = new KeeperException[1];
     for (String path : paths) {
       if (log.isDebugEnabled()) log.debug("process path={} connManager={}", path, connManager);
-  
-
-      CountDownLatch finalLatch = latch;
 
       connManager.getKeeper().delete(path, -1, (rc, path1, ctx) -> {
         try {
@@ -855,9 +849,7 @@ public class SolrZkClient implements Closeable {
             }
           }
         } finally {
-          if (wait) {
-            finalLatch.countDown();
-          }
+          latch.countDown();
         }
       }, null);
 
@@ -867,43 +859,26 @@ public class SolrZkClient implements Closeable {
       log.debug("done with all paths, see if wait ... wait={}", wait);
     }
     if (wait) {
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-      boolean success = false;
-      while (!timeout.hasTimedOut() && !isClosed) {
-        if (!connManager.getKeeper().getState().isConnected()) {
-          try {
-            connManager.waitForConnected(30000);
-          } catch (TimeoutException e) {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-          } catch (InterruptedException e) {
-            ParWork.propagateInterrupt(e);
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-          }
-        }
+      try {
+        boolean success = latch.await(10, TimeUnit.SECONDS);
 
-        if (ke[0] != null) {
-          throw ke[0];
-        }
-        try {
-          success = latch.await(10, TimeUnit.SECONDS);
-          if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
-          if (success) {
-            break;
+        if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+        if (success) {
+          if (ke[0] != null) {
+            throw ke[0];
           }
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          log.error("", e);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
         }
+      } catch (InterruptedException e) {
+        ParWork.propagateInterrupt(e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
     }
 
-    if (ke[0] != null) {
-      throw ke[0];
-    }
     if (log.isDebugEnabled()) {
       log.debug("done with delete {} {}", paths, wait);
     }
+
+    return latch;
   }
 
   // Calls setData for a list of existing paths in parallel
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 12d2671..30de809 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -260,7 +260,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     public boolean canBeRemoved() {
       int refCount = coreRefCount.get();
       int watcherCount = stateWatchers.size();
-      log.debug("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
+      log.trace("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
       return refCount <= 0 && watcherCount <= 0;
     }
 
@@ -449,7 +449,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (log.isDebugEnabled()) {
         log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
       }
-      DocCollection nu = getCollectionLive(this, coll);
+      DocCollection nu = getCollectionLive(coll);
       if (nu == null) return -3;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
         if (updateWatchedCollection(coll, nu, false)) {
@@ -634,13 +634,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   private void constructState(DocCollection collection, String caller) {
 
-    if (log.isDebugEnabled()) log.debug("construct new cluster state on structure change {} {}", caller, collection);
+    if (log.isDebugEnabled()) log.trace("construct new cluster state on structure change {} {}", caller, collection);
 
 
-    if (log.isTraceEnabled()) {
-      log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
+
+    log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
           clusterState.keySet());
-    }
+
 //
 //    watchedCollectionStates.forEach((s, slices) -> {
 //      clusterState.putIfAbsent(s, new ClusterState.CollectionRef(slices));
@@ -689,8 +689,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           LazyCollectionRef docRef = new LazyCollectionRef(coll);
           LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
           if (old == null) {
-            log.debug("Created lazy collection {}", coll);
             clusterState.put(coll, docRef);
+
+            log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
+                watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
           }
         }
       }
@@ -752,7 +754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   private void notifyCloudCollectionsListeners(boolean notifyIfSame) {
-    if (log.isDebugEnabled()) log.debug("Notify cloud collection listeners {}", notifyIfSame);
+    log.trace("Notify cloud collection listeners {}", notifyIfSame);
     Set<String> newCollections;
     Set<String> oldCollections;
     boolean fire = true;
@@ -763,9 +765,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       fire = true;
     }
 
-    if (log.isDebugEnabled()) log.debug("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
+    log.trace("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
     if (fire) {
-
       cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(oldCollections, newCollections));
     }
   }
@@ -807,7 +808,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
         if (shouldFetch) {
           try {
-            DocCollection cdc = getCollectionLive(ZkStateReader.this, collName);
+            DocCollection cdc = getCollectionLive(collName);
             if (cdc != null) {
               cdc.setCreatedLazy();
               lastUpdateTime = System.nanoTime();
@@ -959,7 +960,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public Replica getLeader(String collection, String shard) {
-    return getLeader(getClusterState().getCollection(collection), shard);
+    return getLeader(getCollectionOrNull(collection), shard);
   }
 
   private Replica getLeader(DocCollection docCollection, String shard) {
@@ -1486,18 +1487,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     @Override
     public void close() throws IOException {
       this.closed = true;
-  //    IOUtils.closeQuietly(stateUpdateWatcher);
-//      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-//      if (zk != null) {
-//        try {
-//          zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
-//        } catch (KeeperException.NoWatcherException e) {
-//
-//        } catch (Exception e) {
-//          if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-//        }
-//      }
-//      IOUtils.closeQuietly(stateUpdateWatcher);
     }
 
     private class StateUpdateWatcher implements Watcher, Closeable {
@@ -1511,40 +1500,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       @Override
       public void close() throws IOException {
         this.closed = true;
-//        SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-//        if (zk != null) {
-//          if (stateUpdateWatcher != null) {
-//            try {
-//              zk.removeWatches(getCollectionStateUpdatesPath(coll), stateUpdateWatcher, WatcherType.Any, true);
-//            } catch (KeeperException.NoWatcherException e) {
-//
-//            } catch (Exception e) {
-//              if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-//            }
-//          }
-//        }
       }
 
       @Override
       public void process(WatchedEvent event) {
         if (zkClient.isClosed() || closed) return;
-        if (log.isDebugEnabled()) log.debug("_statupdates event {}", event);
+        log.trace("_statupdates event {}", event);
 
         try {
-
-          //            if (event.getType() == EventType.NodeDataChanged ||
-          //                event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
           collectionStateLock.lock();
           getAndProcessStateUpdates(coll, stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
-          //            }
-
         } catch (AlreadyClosedException e) {
 
         } catch (Exception e) {
           log.error("Unwatched collection: [{}]", coll, e);
         }
       }
-
     }
   }
 
@@ -1783,7 +1754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
   }
 
-  public DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+  public DocCollection getCollectionLive(String coll) {
     log.debug("getCollectionLive {}", coll);
     DocCollection newState;
     try {
@@ -1812,7 +1783,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, boolean live, DocCollection docCollection, ReentrantLock collectionStateLock) throws KeeperException, InterruptedException {
     DocCollection result = null;
     try {
-      log.debug("get and process state updates for {}", coll);
+      log.trace("get and process state updates for {}", coll);
 
       Stat stat;
       try {
@@ -1827,7 +1798,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       if (docCollection != null && docCollection.hasStateUpdates()) {
         int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
-        if (stat.getVersion() < oldVersion) {
+        if (stat.getVersion() <= oldVersion) {
           if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
           return docCollection;
         }
@@ -1854,7 +1825,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
 
       Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
-      if (log.isDebugEnabled()) log.debug("Got additional state updates with version {} {} cs={}", version, m, clusterState);
+      if (log.isDebugEnabled()) log.debug("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
 
       m.remove("_cs_ver_");
       m.put("_ver_", stat.getVersion());
@@ -1862,11 +1833,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         Set<Entry<String,Object>> entrySet = m.entrySet();
 
         if (docCollection != null) {
-          // || (version > docCollection.getZNodeVersion() && clusterState.getZkClusterStateVersion() == -1)) {
-//          if (!docCollection.hasStateUpdates() && version < docCollection.getZNodeVersion()) {
-//            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
-//            return docCollection;
-//          }
+          if (version < docCollection.getZNodeVersion()) {
+            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+            return docCollection;
+          }
 
           if (docCollection.hasStateUpdates()) {
             int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
@@ -1885,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             }
 
             Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
-            if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
+            if (log.isTraceEnabled()) log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
 
             if (replica != null) {
 
@@ -1902,7 +1872,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
                   if (replica.getName().equals(r.getName())) {
                     continue;
                   }
-                  log.debug("process non leader {} {}", r, r.getProperty(LEADER_PROP));
+                  log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
                   if ("true".equals(r.getProperties().get(LEADER_PROP))) {
                     log.debug("remove leader prop {}", r);
                     Map<String,Object> props = new HashMap<>(r.getProperties());
@@ -1912,7 +1882,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
                   }
                 }
               } else if (state != null && !properties.get(ZkStateReader.STATE_PROP).equals(state.toString())) {
-                if (log.isDebugEnabled()) log.debug("std state, set to {}", state);
+                log.trace("std state, set to {}", state);
                 properties.put(ZkStateReader.STATE_PROP, state.toString());
                 if ("true".equals(properties.get(LEADER_PROP))) {
                   properties.remove(LEADER_PROP);
@@ -1921,7 +1891,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
               Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
 
-              if (log.isDebugEnabled()) log.debug("add new replica {}", newReplica);
+              log.trace("add new replica {}", newReplica);
 
               replicasMap.put(replica.getName(), newReplica);
 
@@ -1933,7 +1903,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
               Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
               newSlices.put(slice.getName(), newSlice);
 
-              if (log.isDebugEnabled()) log.debug("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+              log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
 
               DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), m);
               docCollection = newDocCollection;
@@ -1957,7 +1927,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             //
             //            });
 
-            if (log.isDebugEnabled()) log.debug("Set a new clusterstate based on update diff {}", result);
+            log.trace("Set a new clusterstate based on update diff {}", result);
 
             updateWatchedCollection(coll, result, false);
             constructState(result);
@@ -2001,39 +1971,41 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
       }
 
-      if (lazyCollectionStates.containsKey(coll)) {
-        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
-        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
-        if (cachedCollection != null) {
-          int localVersion = cachedCollection.getZNodeVersion();
-          if (cachedCollection.hasStateUpdates()) {
-            if (localVersion == version) {
-              return cachedCollection;
-            }
-          } else {
-            if (localVersion == version) {
-              return cachedCollection;
-            }
-          }
-        }
-
-      }
+//      if (lazyCollectionStates.containsKey(coll)) {
+//        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+//        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+//        if (cachedCollection != null) {
+//          int localVersion = cachedCollection.getZNodeVersion();
+//          if (cachedCollection.hasStateUpdates()) {
+//            if (localVersion == version) {
+//              return cachedCollection;
+//            }
+//          } else {
+//            if (localVersion == version) {
+//              return cachedCollection;
+//            }
+//          }
+//        }
+//      }
 
     } else {
       return null;
     }
-    if (log.isDebugEnabled()) log.debug("getting latest state.json");
+    log.debug("getting latest state.json");
     Stat stat = new Stat();
     byte[] data;
     try {
       data = zkClient.getData(collectionPath, null, stat, true);
     } catch (NoNodeException e) {
+      log.debug("no state.json znode found");
+      return null;
+    }
+
+    if (data == null) {
+      log.debug("no data found at state.json node");
       return null;
     }
-    if (data == null) return null;
-    ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
-    ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
-    DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+    DocCollection docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
 
     return docCollection;
   }
@@ -2084,6 +2056,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
+        v.coreRefCount.incrementAndGet();
         return v;
       }
       v.coreRefCount.incrementAndGet();
@@ -2187,11 +2160,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
-        log.debug("creating new Collection State watcher for {}", collection);
+        log.debug("creating CollectionStateWatcher and refreshing for {}", collection);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
-        log.debug("creating watches and refreshing state watcher for {}", collection);
+
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
@@ -2202,19 +2175,26 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     });
 
     DocCollection state = getCollectionOrNull(collection);
-//
-//    if (state == null) {
-//      forciblyRefreshClusterStateSlow(state.getName());
-//      state = clusterState.getCollectionOrNull(collection);
-//    }
+    boolean remove;
+     try {
+       remove = docCollectionWatcher.onStateChanged(state);
+     } catch (Exception e) {
+       log.error("Exception running DocCollectionWatcher collection={}");
+       remove = true;
+     }
+     if (remove) {
+       removeDocCollectionWatcher(collection, docCollectionWatcher);
+     }
 
-    if (docCollectionWatcher.onStateChanged(state) == true) {
-      removeDocCollectionWatcher(collection, docCollectionWatcher);
-    }
+  }
 
+  public DocCollection getCollection(String collection) {
+    DocCollection coll = getCollectionOrNull(collection);
+    if (coll == null) throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection : " + collection + " collections=" + clusterState.keySet());
+    return coll;
   }
 
-  private DocCollection getCollectionOrNull(String collection) {
+  public DocCollection getCollectionOrNull(String collection) {
     ClusterState.CollectionRef coll = clusterState.get(collection);
     if (coll == null) return null;
     return coll.get();
@@ -2275,11 +2255,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
-    log.info("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
+    log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
         clusterState.size());
 
-    log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
-        clusterState.keySet());
     assert collection != null;
     CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
 
@@ -2378,7 +2356,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @see #registerDocCollectionWatcher
    */
   public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
-    if (log.isDebugEnabled()) log.debug("remove watcher for collection {}", collection);
+    log.debug("remove watcher for collection {}", collection);
 
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
@@ -2390,16 +2368,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (v == null) return null;
       v.stateWatchers.remove(watcher);
       if (v.canBeRemoved()) {
-        log.debug("no longer watch collection {}", collection);
+        log.trace("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
         clusterState.put(collection, docRef);
-//        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
-//        if (stateWatcher != null) {
-//          IOUtils.closeQuietly(stateWatcher);
-//          stateWatcher.removeWatch();
-//        }
+        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+        if (stateWatcher != null) {
+          stateWatcher.removeWatch();
+          IOUtils.closeQuietly(stateWatcher);
+        }
         reconstructState.set(true);
         return null;
       }
@@ -2529,7 +2507,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   private void notifyStateWatchers(String collection, DocCollection collectionState) {
-    if (log.isDebugEnabled()) log.debug("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
+    log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
 
     try {
       notifications.submit(new Notification(collection, collectionState, collectionWatches));
@@ -2572,7 +2550,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       try (ParWork work = new ParWork(this)) {
         watchers.forEach(watcher -> {
           work.collect("", () -> {
-            log.debug("Notify DocCollectionWatcher {} {}", watcher, collectionState);
+            log.trace("Notify DocCollectionWatcher {} {}", watcher, collectionState);
             try {
               if (watcher.onStateChanged(collectionState)) {
                 removeDocCollectionWatcher(collection, watcher);
@@ -2875,7 +2853,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         return false;
       Collection<Slice> activeSlices = collectionState.getActiveSlices();
 
-      log.debug("active slices expected={} {} {} allSlices={}",expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
+      log.trace("active slices expected={} {} {} allSlices={}", expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
 
       if (!exact) {
         if (activeSlices.size() < expectedShards) {
@@ -2888,16 +2866,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
 
       if (expectedReplicas == 0 && !exact) {
-        log.info("0 replicas expected and found, return");
+        log.debug("0 replicas expected and found, return");
         return true;
       }
 
       int activeReplicas = 0;
       for (Slice slice : activeSlices) {
         Replica leader = slice.getLeader();
-        log.info("slice is {} and leader is {}", slice.getName(), leader);
+        log.trace("slice is {} and leader is {}", slice.getName(), leader);
         if (leader == null) {
-          log.info("slice={}", slice);
+          log.debug("slice={}", slice);
           return false;
         }
         for (Replica replica : slice) {
@@ -2905,7 +2883,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             activeReplicas++;
           }
         }
-        log.info("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
+        log.trace("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
       }
       if (!exact) {
         if (activeReplicas >= expectedReplicas) {
@@ -2934,7 +2912,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   protected void checkShardConsistency(String collection, boolean verbose)
       throws Exception {
 
-    Set<String> theShards = getClusterState().getCollection(collection).getSlicesMap().keySet();
+    Set<String> theShards = getCollection(collection).getSlicesMap().keySet();
     String failMessage = null;
     for (String shard : theShards) {
       String shardFailMessage = checkShardConsistency(collection, shard, false, verbose);
@@ -2970,7 +2948,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     if (verbose) System.err.println("__________________________\n");
     int cnt = 0;
 
-    DocCollection coll = getClusterState().getCollection(collection);
+    DocCollection coll = getCollection(collection);
 
     Slice replicas = coll.getSlice(shard);