You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/13 03:56:33 UTC

[lucene-solr] branch reference_impl updated (fe8df05 -> 2141e43)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


 discard fe8df05  @1466 Quick cleanup for visitors - my office needs the same.
     new a048094  @1466 Quick cleanup for visitors - my office needs the same.
     new 2141e43  @1467 Cleanup after stress work.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (fe8df05)
            \
             N -- N -- N   refs/heads/reference_impl (2141e43)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/solr/cloud/LeaderElector.java  |   4 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   | 233 +++-----
 .../solr/cloud/OverseerTaskExecutorTask.java       | 102 ----
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  22 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   2 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |  46 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  24 +-
 .../apache/solr/cloud/api/collections/Assign.java  |   2 +-
 .../api/collections/CollectionCmdResponse.java     |   7 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   6 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |  27 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   4 +-
 .../OverseerCollectionMessageHandler.java          |  12 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   8 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 621 ++++++++++-----------
 .../java/org/apache/solr/core/CoreContainer.java   |  23 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |   7 +-
 .../solr/handler/admin/CollectionsHandler.java     |   2 +-
 .../apache/solr/rest/ManagedResourceStorage.java   |   2 +-
 .../java/org/apache/solr/update/CommitTracker.java |   2 +-
 .../java/org/apache/solr/update/UpdateHandler.java |   5 +-
 .../src/java/org/apache/solr/update/UpdateLog.java |   4 +-
 .../java/org/apache/solr/update/VersionInfo.java   |   2 +-
 .../CreateCollectionsIndexAndRestartTest.java      |   2 +-
 .../cloud/api/collections/TestCollectionAPI.java   |   4 +-
 .../solrj/impl/SolrClientNodeStateProvider.java    |   2 +-
 .../org/apache/solr/common/cloud/ClusterState.java |  14 +
 .../solr/common/cloud/ConnectionManager.java       |   2 +-
 .../apache/solr/common/cloud/DocCollection.java    |   6 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  65 +--
 .../apache/solr/common/cloud/ZkStateReader.java    | 182 +++---
 34 files changed, 605 insertions(+), 845 deletions(-)
 delete mode 100644 solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java


[lucene-solr] 02/02: @1467 Cleanup after stress work.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 2141e43b9b9acc0d6480fd236a9f903841420b67
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Mar 12 21:56:02 2021 -0600

    @1467 Cleanup after stress work.
    
    Took 13 minutes
---
 .../java/org/apache/solr/cloud/LeaderElector.java  |   4 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   | 233 +++-----
 .../solr/cloud/OverseerTaskExecutorTask.java       | 102 ----
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  22 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   2 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |  46 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  24 +-
 .../apache/solr/cloud/api/collections/Assign.java  |   2 +-
 .../api/collections/CollectionCmdResponse.java     |   7 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   6 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |  27 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   4 +-
 .../OverseerCollectionMessageHandler.java          |  12 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   8 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 621 ++++++++++-----------
 .../java/org/apache/solr/core/CoreContainer.java   |  23 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |   7 +-
 .../solr/handler/admin/CollectionsHandler.java     |   2 +-
 .../apache/solr/rest/ManagedResourceStorage.java   |   2 +-
 .../java/org/apache/solr/update/CommitTracker.java |   2 +-
 .../java/org/apache/solr/update/UpdateHandler.java |   5 +-
 .../src/java/org/apache/solr/update/UpdateLog.java |   4 +-
 .../java/org/apache/solr/update/VersionInfo.java   |   2 +-
 .../CreateCollectionsIndexAndRestartTest.java      |   2 +-
 .../cloud/api/collections/TestCollectionAPI.java   |   4 +-
 .../solrj/impl/SolrClientNodeStateProvider.java    |   2 +-
 .../org/apache/solr/common/cloud/ClusterState.java |  14 +
 .../solr/common/cloud/ConnectionManager.java       |   2 +-
 .../apache/solr/common/cloud/DocCollection.java    |   6 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  65 +--
 .../apache/solr/common/cloud/ZkStateReader.java    | 182 +++---
 34 files changed, 605 insertions(+), 845 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 8ac4d79..71873ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -206,7 +206,7 @@ public class LeaderElector implements Closeable {
           try {
             String watchedNode = holdElectionPath + "/" + toWatch;
 
-            log.info("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
+            log.debug("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
 
             ElectionWatcher oldWatcher = watcher;
             if (oldWatcher != null) {
@@ -405,7 +405,7 @@ public class LeaderElector implements Closeable {
           leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
         }
 
-        log.info("Joined leadership election with path: {}", leaderSeqPath);
+        log.debug("Joined leadership election with path: {}", leaderSeqPath);
         context.leaderSeqPath = leaderSeqPath;
         state = JOIN;
         break;
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index aec7014..5ae5e4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -74,6 +74,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -705,16 +706,8 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data, false);
   }
 
-  public Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws InterruptedException {
-    if (log.isDebugEnabled()) log.debug("processQueueItem {}", collStateUpdates);
-
-    Future future = new OverseerTaskExecutorTask(getCoreContainer(), collStateUpdates).run();
-
-    return future;
-  }
-
   public Future writePendingUpdates(String collection) {
-    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
+    return zkStateWriter.writePendingUpdates(collection);
   }
 
   private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -812,15 +805,12 @@ public class Overseer implements SolrCloseable {
 
     public void start() throws KeeperException, InterruptedException {
       if (closed) return;
-      ourLock.lock();
-      try {
-        if (closed) return;
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
-        startItems = super.getItems();
-        log.info("Overseer found entries on start {}", startItems);
+
+      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+      startItems = super.getItems();
+      log.info("Overseer found entries on start {}", startItems);
+      if (startItems.size() > 0) {
         processQueueItems(startItems, true);
-      } finally {
-        ourLock.unlock();
       }
     }
 
@@ -828,7 +818,9 @@ public class Overseer implements SolrCloseable {
     protected void processQueueItems(List<String> items, boolean onStart) {
       if (closed) return;
       List<String> fullPaths = new ArrayList<>(items.size());
+      CountDownLatch delCountDownLatch = null;
       ourLock.lock();
+      String forceWrite = null;
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -836,33 +828,22 @@ public class Overseer implements SolrCloseable {
         }
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
-        List<StateEntry> shardStateCollections = null;
-        Set<String> scollections = null;
-        List<Future> futures = new ArrayList<>();
-        for (byte[] item : data.values()) {
-          final ZkNodeProps message = ZkNodeProps.load(item);
-          try {
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
-            if (onStart) {
-              if (operation.equals("state")) {
-                message.getProperties().remove(OverseerAction.DOWNNODE);
-                if (message.getProperties().size() == 1) {
-                  continue;
-                }
-              }
-            }
 
-            // hack
-            if (operation.equals("updateshardstate")) {
-              if (scollections == null) {
-                scollections = new HashSet<>();
-              }
-              scollections.add(message.getStr("collection"));
+        if (fullPaths.size() > 0) {
+          if (!zkController.getZkClient().isClosed()) {
+            try {
+              delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
+            } catch (Exception e) {
+              log.warn("Failed deleting processed items", e);
             }
+          }
+        }
 
-            if (shardStateCollections == null) {
-              shardStateCollections = new ArrayList<>();
-            }
+        final List<StateEntry> shardStateCollections = new ArrayList<>();
+
+        for (byte[] item : data.values()) {
+          final ZkNodeProps message = ZkNodeProps.load(item);
+          try {
             StateEntry entry = new StateEntry();
             entry.message = message;
             shardStateCollections.add(entry);
@@ -872,8 +853,9 @@ public class Overseer implements SolrCloseable {
           }
         }
         Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates = new HashMap<>();
-        try {
-          for (Overseer.StateEntry sentry : shardStateCollections) {
+
+        for (Overseer.StateEntry sentry : shardStateCollections) {
+          try {
             ZkNodeProps stateUpdateMessage = sentry.message;
             final String op = stateUpdateMessage.getStr(StatePublisher.OPERATION);
             OverseerAction overseerAction = OverseerAction.get(op);
@@ -888,34 +870,9 @@ public class Overseer implements SolrCloseable {
 
                 for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
                   if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
-                    continue;
-                  } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
-                    continue;
-                  } else {
-                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
-                    String id = stateUpdateEntry.getKey();
-
-                    String stateString = (String) stateUpdateEntry.getValue();
-                    if (log.isDebugEnabled()) {
-                      log.debug("stateString={}", stateString);
-                    }
-                    String collId = id.substring(0, id.indexOf('-'));
-                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
-                    if (updates == null) {
-                      updates = new ArrayList<>();
-                      collStateUpdates.put(collId, updates);
+                    if (onStart) {
+                      continue;
                     }
-
-                    ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
-                    update.id = id;
-                    update.state = stateString;
-                    updates.add(update);
-
-                  }
-                }
-
-                for (Map.Entry<String,Object> m : stateUpdateMessage.getProperties().entrySet()) {
-                  if (OverseerAction.DOWNNODE.equals(OverseerAction.get(m.getKey()))) {
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
                       List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -925,7 +882,7 @@ public class Overseer implements SolrCloseable {
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
-                        if (replica.getNodeName().equals(m.getValue())) {
+                        if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
                           if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
@@ -934,8 +891,7 @@ public class Overseer implements SolrCloseable {
                         }
                       }
                     });
-                  }
-                  if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(m.getKey()))) {
+                  } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
                       List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -945,7 +901,7 @@ public class Overseer implements SolrCloseable {
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
-                        if (replica.getNodeName().equals(m.getValue())) {
+                        if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
                           if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
@@ -954,6 +910,25 @@ public class Overseer implements SolrCloseable {
                         }
                       }
                     });
+                  } else {
+                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
+                    String id = stateUpdateEntry.getKey();
+
+                    String stateString = (String) stateUpdateEntry.getValue();
+                    if (log.isDebugEnabled()) {
+                      log.debug("stateString={}", stateString);
+                    }
+                    String collId = id.substring(0, id.indexOf('-'));
+                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                    if (updates == null) {
+                      updates = new ArrayList<>();
+                      collStateUpdates.put(collId, updates);
+                    }
+
+                    ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+                    update.id = id;
+                    update.state = stateString;
+                    updates.add(update);
                   }
                 }
 
@@ -964,27 +939,29 @@ public class Overseer implements SolrCloseable {
               //          case REMOVEROUTINGRULE:
               //            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
               case UPDATESHARDSTATE:  // MRM TODO: look at how we handle this and make it so it can use StatePublisher
-                  String collection = stateUpdateMessage.getStr("collection");
-                  stateUpdateMessage.getProperties().remove("collection");
-                  stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
-                  Long collIdLong = zkStateWriter.getCS().get(collection).getId();
-                  if (collIdLong != null) {
-                    String collId = Long.toString(collIdLong);
-                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
-                    if (updates == null) {
-                      updates = new ArrayList<>();
-                      collStateUpdates.put(collId, updates);
-                    }
+                String collection = stateUpdateMessage.getStr("collection");
+                stateUpdateMessage.getProperties().remove("collection");
+                stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
+                Long collIdLong = zkStateWriter.getCS().get(collection).getId();
+                if (collIdLong != null) {
+                  String collId = Long.toString(collIdLong);
+                  List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                  if (updates == null) {
+                    updates = new ArrayList<>();
+                    collStateUpdates.put(collId, updates);
+                  }
 
-                    if (collIdLong != null) {
-                      for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
-                        ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
-                        update.sliceState = (String) stateUpdateEntry.getValue();
-                        update.sliceName = stateUpdateEntry.getKey();
-                        updates.add(update);
-                      }
+                  if (collIdLong != null) {
+                    for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
+                      // MRM TODO: slice state should be done like replica state, this is a hack
+                      forceWrite = collection;
+                      ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+                      update.sliceState = (String) stateUpdateEntry.getValue();
+                      update.sliceName = stateUpdateEntry.getKey();
+                      updates.add(update);
                     }
                   }
+                }
 
                 break;
 
@@ -992,61 +969,40 @@ public class Overseer implements SolrCloseable {
                 throw new RuntimeException("unknown operation:" + op + " contents:" + stateUpdateMessage.getProperties());
 
             }
+          } catch (Exception e) {
+            log.error("Overseer state update queue processing failed", e);
+            continue;
           }
-
-        } catch (Exception e) {
-          log.error("Overseer state update queue processing failed", e);
         }
 
-        Future future = null;
         try {
-          future = overseer.processQueueItem(collStateUpdates);
+          getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
         } catch (Exception e) {
           log.error("Overseer state update queue processing failed", e);
         }
 
-        try {
-          future.get();
-        } catch (Exception e) {
-          log.error("failed waiting for enqueued updates", e);
-        }
-
-        futures.clear();
         Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
         for (String collection : collections) {
-          futures.add(overseer.writePendingUpdates(collection));
+          overseer.writePendingUpdates(collection);
         }
 
-        for (Future f : futures) {
-          try {
-            f.get();
-          } catch (Exception e) {
-            log.error("failed waiting for enqueued updates", e);
-          }
-        }
-        futures.clear();
-        if (scollections != null) {
-          for (String collection : scollections) {
-            futures.add(overseer.writePendingUpdates(collection));
-          }
+        if (collections.size() == 0 && forceWrite != null) {
+          overseer.writePendingUpdates(forceWrite);
         }
-        //        for (Future future : futures) {
-        //          try {
-        //            future.get();
-        //          } catch (Exception e) {
-        //            log.error("failed waiting for enqueued updates", e);
-        //          }
-        //        }
-      } finally {
 
-        if (overseer.zkStateWriter != null) {
-          if (zkController.getZkClient().isAlive()) {
+      } finally {
+        try {
+          if (delCountDownLatch != null) {
             try {
-              zkController.getZkClient().delete(fullPaths, true);
-            } catch (Exception e) {
-              log.warn("Failed deleting processed items", e);
+              boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
+
+              if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+
+            } catch (InterruptedException e) {
+              ParWork.propagateInterrupt(e);
             }
           }
+        } finally {
           ourLock.unlock();
         }
       }
@@ -1085,17 +1041,14 @@ public class Overseer implements SolrCloseable {
     @Override
     public void start() throws KeeperException, InterruptedException {
       if (closed) return;
-      ourLock.lock();
-      try {
-        if (closed) return;
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        startItems = super.getItems();
+      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        log.info("Overseer found entries on start {}", startItems);
+      startItems = super.getItems();
+
+      log.info("Overseer found entries on start {}", startItems);
+      if (startItems.size() > 0) {
         processQueueItems(startItems, true);
-      } finally {
-        ourLock.unlock();
       }
     }
 
@@ -1105,7 +1058,7 @@ public class Overseer implements SolrCloseable {
       ourLock.lock();
       List<String> fullPaths = new ArrayList<>(items.size());
       try {
-        log.info("Found collection queue items {} onStart={}", items, onStart);
+        log.debug("Found collection queue items {} onStart={}", items, onStart);
         for (String item : items) {
           fullPaths.add(path + "/" + item);
         }
@@ -1216,7 +1169,7 @@ public class Overseer implements SolrCloseable {
           } else {
             byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
             completedMap.update(entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1), sdata);
-            log.info("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
+            log.debug("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
           }
 
         } catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
deleted file mode 100644
index e0f5f67..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.cloud.overseer.ZkStateWriter;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.core.CoreContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-public class OverseerTaskExecutorTask {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final ZkController zkController;
-  private final SolrCloudManager cloudManager;
-  private final SolrZkClient zkClient;
-  private final Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates;
-
-  public OverseerTaskExecutorTask(CoreContainer cc, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) {
-    this.zkController = cc.getZkController();
-    this.zkClient = zkController.getZkClient();
-    this.cloudManager = zkController.getSolrCloudManager();
-    this.collStateUpdates = collStateUpdates;
-  }
-
-
-  private Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws Exception {
-    if (log.isDebugEnabled()) log.debug("Consume state update from queue {} {}", collStateUpdates);
-
-    // assert clusterState != null;
-
-    //  if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
-
-
-//    if (log.isDebugEnabled()) log.debug("Queue operation is {}", operation);
-//
-//    if (log.isDebugEnabled()) log.debug("Process message {} {}", message, operation);
-//
-//    if (log.isDebugEnabled()) log.debug("Enqueue message {}", operation);
-    try {
-      return zkController.getOverseer().getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
-    } catch (NullPointerException e) {
-      log.info("Overseer is stopped, won't process message " + zkController.getOverseer());
-      return null;
-    }
-
-  }
-
-
-  public Future run() {
-    if (log.isDebugEnabled()) log.debug("OverseerTaskExecutorTask, going to process message {}", collStateUpdates);
-
-    try {
-      return processQueueItem(collStateUpdates);
-    } catch (Exception e) {
-      log.error("Failed to process message " + collStateUpdates, e);
-    }
-    return null;
-  }
-
-  public static class WriteTask implements Runnable {
-    private final String collection;
-    CoreContainer coreContainer;
-
-    public WriteTask(CoreContainer coreContainer, String collection) {
-      this.collection = collection;
-      this.coreContainer = coreContainer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
-      } catch (NullPointerException e) {
-        if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
-      } catch (Exception e) {
-        log.error("Failed to process pending updates", e);
-      }
-    }
-  }
-}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index efb514c..88e138e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -309,7 +309,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
   @Override
   final public void run() {
     // set request info for logging
-    log.info("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
+    log.debug("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
     try {
       try (SolrCore core = cc.getCore(coreName)) {
         if (core == null) {
@@ -508,7 +508,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
   public final boolean doSyncOrReplicateRecovery(SolrCore core, Replica leader) throws Exception {
-    log.info("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
+    log.debug("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
 
     boolean successfulRecovery = false;
     boolean publishedActive = false;
@@ -555,10 +555,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         if (startingVersions.isEmpty()) {
-          log.info("startupVersions is empty");
+          log.debug("startupVersions is empty");
         } else {
-          if (log.isInfoEnabled()) {
-            log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
+          if (log.isDebugEnabled()) {
+            log.debug("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
                 startingVersions.get(startingVersions.size() - 1));
           }
         }
@@ -588,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
 
     if (replicaType == Replica.Type.TLOG) {
-      log.info("Stopping replication from leader for {}", coreName);
+      log.debug("Stopping replication from leader for {}", coreName);
       zkController.stopReplicationFromLeader(coreName);
     }
 
-    log.info("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
+    log.debug("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
 
     zkController.publish(core.getCoreDescriptor(), Replica.State.BUFFERING);
 
@@ -612,7 +612,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           return false;
         }
 
-        log.info("Begin buffering updates. core=[{}]", coreName);
+        log.debug("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
 
@@ -644,16 +644,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
             }
             if (syncSuccess) {
               SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
-              log.info("PeerSync was successful, commit to force open a new searcher");
+              log.debug("PeerSync was successful, commit to force open a new searcher");
               // force open a new searcher
               core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
               req.close();
-              log.info("PeerSync stage of recovery was successful.");
+              log.debug("PeerSync stage of recovery was successful.");
 
               // solrcloud_debug
               // cloudDebugLog(core, "synced");
 
-              log.info("Replaying updates buffered during PeerSync.");
+              log.debug("Replaying updates buffered during PeerSync.");
               replay(core);
 
               // sync success
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 261264c..353e857 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -77,7 +77,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
             // We do this by using a multi and ensuring the parent znode of the leader registration node
             // matches the version we expect - there is a setData call that increments the parent's znode
             // version whenever a leader registers.
-            log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
+            log.debug("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
             List<Op> ops = new ArrayList<>(3);
             ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
             ops.add(Op.delete(leaderSeqPath, -1));
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 0a9c52c1..8491334 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -79,7 +79,6 @@ public class StatePublisher implements Closeable {
 
     @Override
     public void run() {
-      ActionThrottle throttle = new ActionThrottle("StatePublisherWorker", 0);
 
       while (!terminated && !zkStateReader.getZkClient().isClosed()) {
         if (!zkStateReader.getZkClient().isConnected()) {
@@ -92,8 +91,7 @@ public class StatePublisher implements Closeable {
           }
           continue;
         }
-        throttle.minimumWaitBetweenActions();
-        throttle.markAttemptingAction();
+
         ZkNodeProps message = null;
         ZkNodeProps bulkMessage = new ZkNodeProps();
         bulkMessage.getProperties().put(OPERATION, "state");
@@ -101,33 +99,39 @@ public class StatePublisher implements Closeable {
           try {
             message = workQueue.poll(1000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
-            ParWork.propagateInterrupt(e, true);
-            return;
+
           }
           if (message != null) {
-            if (log.isDebugEnabled()) log.debug("Got state message " + message);
+            log.debug("Got state message " + message);
             if (message == TERMINATE_OP) {
+              log.debug("State publish is terminated");
               terminated = true;
-              message = null;
+              return;
             } else {
               bulkMessage(message, bulkMessage);
             }
 
-            while (message != null && !terminated) {
+            while (!terminated) {
               try {
-                message = workQueue.poll(5, TimeUnit.MILLISECONDS);
+                message = workQueue.poll(100, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
-
+                log.warn("state publisher interrupted", e);
+                return;
               }
-              if (log.isDebugEnabled()) log.debug("Got state message " + message);
               if (message != null) {
+                if (log.isDebugEnabled()) log.debug("Got state message " + message);
                 if (message == TERMINATE_OP) {
                   terminated = true;
                 } else {
                   bulkMessage(message, bulkMessage);
                 }
+              } else {
+                break;
               }
             }
+          }
+
+          if (bulkMessage.getProperties().size() > 1) {
             processMessage(bulkMessage);
           }
 
@@ -141,25 +145,28 @@ public class StatePublisher implements Closeable {
       }
     }
 
-    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
+    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
+      if (log.isDebugEnabled()) log.debug("Bulk state zkNodeProps={} bulkMessage={}", zkNodeProps, bulkMessage);
       if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
         String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+        //clearStatesForNode(bulkMessage, nodeName);
         bulkMessage.getProperties().put(OverseerAction.DOWNNODE.toLower(), nodeName);
+        log.debug("bulk state publish down node, props={} result={}", zkNodeProps, bulkMessage);
 
-        clearStatesForNode(bulkMessage, nodeName);
       } else if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.RECOVERYNODE) {
+        log.debug("bulk state publish recovery node, props={} result={}", zkNodeProps, bulkMessage);
         String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+       // clearStatesForNode(bulkMessage, nodeName);
         bulkMessage.getProperties().put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
-
-        clearStatesForNode(bulkMessage, nodeName);
+        log.debug("bulk state publish recovery node, props={} result={}" , zkNodeProps, bulkMessage);
       } else {
-        String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+        //String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
         String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
         String id = zkNodeProps.getStr("id");
         String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
 
         String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
-        if (log.isDebugEnabled()) log.debug("Bulk publish core={} id={} line={}", core, id, line);
+        if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
         bulkMessage.getProperties().put(id, line);
       }
     }
@@ -188,11 +195,8 @@ public class StatePublisher implements Closeable {
     }
 
     private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
-      if (message.getProperties().size() <= 1) {
-        return;
-      }
+      log.debug("Send state updates to Overseer {}", message);
       byte[] updates = Utils.toJSON(message);
-      if (log.isDebugEnabled()) log.debug("Send state updates to Overseer {}", message);
       overseerJobQueue.offer(updates);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5d85691..680e57c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -325,8 +325,7 @@ public class ZkController implements Closeable, Runnable {
           try {
             zkController.register(descriptor.getName(), descriptor, afterExpiration);
           } catch (Exception e) {
-            log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration);
-            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+            log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
           }
         }
         return descriptor;
@@ -1263,7 +1262,7 @@ public class ZkController implements Closeable, Runnable {
       final String collection = cloudDesc.getCollectionName();
       final String shardId = cloudDesc.getShardId();
 
-      log.info("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+      log.debug("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
 
       DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
       if (docCollection != null) {
@@ -1274,14 +1273,9 @@ public class ZkController implements Closeable, Runnable {
         }
       }
 
-      // multiple calls of this method will left only one watcher
+      log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
 
-      getZkStateReader().registerCore(cloudDesc.getCollectionName(), coreName);
-
-
-      log.info("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
-
-      log.info("Register terms for replica {}", coreName);
+      log.debug("Register terms for replica {}", coreName);
 
       registerShardTerms(collection, cloudDesc.getShardId(), coreName);
 
@@ -1333,7 +1327,7 @@ public class ZkController implements Closeable, Runnable {
             throw new AlreadyClosedException();
           }
 
-          log.info("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
+          log.debug("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
         }
       }
 
@@ -1345,9 +1339,9 @@ public class ZkController implements Closeable, Runnable {
 
       boolean isLeader = leaderName.equals(coreName);
 
-      log.info("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
+      log.debug("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
 
-      log.info("Check if we should recover isLeader={}", isLeader);
+      log.debug("Check if we should recover isLeader={}", isLeader);
       //assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
 
       // recover from local transaction log and wait for it to complete before
@@ -1415,7 +1409,7 @@ public class ZkController implements Closeable, Runnable {
       // MRM TODO:
      // registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
 
-      log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+      log.debug("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
 
       desc.getCloudDescriptor().setHasRegistered(true);
 
@@ -1581,7 +1575,7 @@ public class ZkController implements Closeable, Runnable {
    */
   public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
     MDCLoggingContext.setCoreName(cd.getName());
-    log.info("publishing state={}", state);
+    log.debug("publishing state={}", state);
     String collection = cd.getCloudDescriptor().getCollectionName();
     String shardId = cd.getCloudDescriptor().getShardId();
     Map<String,Object> props = new HashMap<>();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index 774e77a..f99fc02 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -105,7 +105,7 @@ public class Assign {
     int cnt = overseer.getZkStateWriter().getReplicaAssignCnt(collection.getName(), shard);
 
     String corename = String.format(Locale.ROOT, "%s%s", namePrefix, cnt);
-    log.info("Assigned SolrCore name={} id={}", corename, cnt);
+    log.debug("Assigned SolrCore name={} id={}", corename, cnt);
     ReplicaName replicaName = new ReplicaName();
     replicaName.coreName = corename;
     replicaName.id = cnt;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
index e498d4a..1a5c92f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
@@ -111,7 +111,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
       ShardRequestTracker shardRequestTracker, @SuppressWarnings({"rawtypes"})NamedList results)
       throws IOException, InterruptedException, KeeperException {
 
-    log.info("addReplica() : {}", Utils.toJSONString(message));
+    log.debug("addReplica() : {}", Utils.toJSONString(message));
 
     String extCollectionName = message.getStr(COLLECTION_PROP);
     boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
@@ -194,7 +194,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
 
       ModifiableSolrParams params = getReplicaParams(collection, message, results, skipCreateReplicaInClusterState, shardHandler, createReplica);
 
-      log.info("create replica {} params={}", createReplica, params);
+      log.debug("create replica {} params={}", createReplica, params);
       if (!onlyUpdateState) {
         shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
       }
@@ -220,7 +220,6 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
           public Response call() {
             if (!onlyUpdateState && createdShardHandler) {
               try {
-                 log.info("Processs responses");
                 shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
               } catch (Exception e) {
                 ParWork.propagateInterrupt(e);
@@ -357,7 +356,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
   public static CreateReplica assignReplicaDetails(DocCollection coll,
                                                  ZkNodeProps message, ReplicaPosition replicaPosition, Overseer overseer) {
 
-    log.info("assignReplicaDetails {} {}", message, replicaPosition);
+    log.debug("assignReplicaDetails {} {}", message, replicaPosition);
 
     boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 0ba35e8..be522c6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -279,7 +279,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
             ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(), ZkStateReader.NUM_SHARDS_PROP, message.getStr(ZkStateReader.NUM_SHARDS_PROP), "shards", message.getStr("shards"),
             CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
         props.getProperties().putAll(addReplicaProps.getProperties());
-        if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
+        log.debug("Sending state update to populate clusterstate with new replica {}", props);
 
         clusterState = new CollectionCmdResponse(ocmh, true).call(clusterState, props, results).clusterState;
         // log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
@@ -319,8 +319,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         coresToCreate.put(coreName, sreq);
       }
 
-      Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
-      future.get();
+      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
+
       writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
 
       if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 91851f9..64c4377 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -183,7 +183,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       //  MRM TODO: - put this in finalizer and finalizer after all calls to allow parallel and forward momentum
       try {
-        overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false).get();
+        overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false);
       } catch (Exception e) {
         log.error("failure", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 29ea39c..9cb3980 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -197,7 +196,7 @@ public class DeleteReplicaCmd implements Cmd {
 
           if (waitForFinalState) {
             try {
-              ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
+              ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false);
               ocmh.overseer.writePendingUpdates(finalCollectionName1);
               waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
             } catch (Exception e) {
@@ -251,8 +250,8 @@ public class DeleteReplicaCmd implements Cmd {
         shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
       }
     }
-    List<OverseerCollectionMessageHandler.Finalize> finalizers = new ArrayList<>();
-    List<Future> futures = new ArrayList<>();
+    List<CollectionCmdResponse.Response> finalizers = new ArrayList<>();
+
     for (Map.Entry<Slice,Set<String>> entry : shardToReplicasMapping.entrySet()) {
       Slice shardSlice = entry.getKey();
       String shardId = shardSlice.getName();
@@ -265,26 +264,16 @@ public class DeleteReplicaCmd implements Cmd {
         clusterState = resp.clusterState;
         if (clusterState != null) {
           try {
-            futures.add(ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false));
+            ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
           } catch (Exception e) {
             log.error("failed sending update to zkstatewriter", e);
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
           }
         }
-        if (resp.asyncFinalRunner != null) {
-          finalizers.add(resp.asyncFinalRunner);
-        }
-      }
 
-      try {
-        for (Future future : futures) {
-          future.get();
-        }
-        ocmh.overseer.writePendingUpdates(collectionName);
-      } catch (Exception e) {
-        log.error("failed writing update to zkstatewriter", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        finalizers.add(resp);
       }
+
       results.add("shard_id", shardId);
       results.add("replicas_deleted", replicas);
     }
@@ -294,8 +283,8 @@ public class DeleteReplicaCmd implements Cmd {
     response.asyncFinalRunner = () -> {
       CollectionCmdResponse.Response resp = new CollectionCmdResponse.Response();
       resp.asyncFinalRunner = () -> {
-        for (OverseerCollectionMessageHandler.Finalize finalize : finalizers) {
-          finalize.call();
+        for (CollectionCmdResponse.Response finalize : finalizers) {
+          finalize.asyncFinalRunner.call();
         }
         return new CollectionCmdResponse.Response();
       };
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 5fbb7f9..a7f00bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -177,7 +177,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
       if (waitForFinalState) {
-        ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
+        ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false);
         ocmh.overseer.writePendingUpdates(collectionName).get();
         ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
           if (coll == null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index e9f6aaa..5fc01e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -278,7 +278,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
 
     CollectionCmdResponse.Response response = ocmh.addReplicaWithResp(clusterState, addReplicasProps, addResult);
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false).get();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false);
 
     // wait for the other replica to be active if the source replica was a leader
 
@@ -309,7 +309,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         try {
           response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
           String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
-          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false);
           asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
         } catch (SolrException e) {
           deleteResult.add("failure", e.toString());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index faa10df..c6f98da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -320,19 +320,18 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
 
         CollectionCmdResponse.Response asyncResp = null;
-        Future future = null;
+
         if (responce.clusterState != null) {
           DocCollection docColl = responce.clusterState.getCollectionOrNull(collection);
 
           if (docColl != null) {
 
-            future = zkWriter.enqueueUpdate(docColl, null, false);
+            zkWriter.enqueueUpdate(docColl, null, false);
 
             if (responce != null && responce.asyncFinalRunner != null) {
               asyncResp = responce.asyncFinalRunner.call();
             }
             if (asyncResp == null || asyncResp.writeFuture == null) {
-              future.get();
               writeFuture2 = overseer.writePendingUpdates(collection);
             }
 
@@ -349,13 +348,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", asyncResp.clusterState);
           if (asyncResp.clusterState != null) {
             DocCollection docColl = asyncResp.clusterState.getCollectionOrNull(collection);
-
             if (docColl != null) {
-
-              zkWriter.enqueueUpdate(docColl, null, false).get();
-              if (future != null) {
-                future.get();
-              }
+              zkWriter.enqueueUpdate(docColl, null, false);
               writeFuture = overseer.writePendingUpdates(collection);
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 2b0fdec..3a55cd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -100,9 +100,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     for (ZkNodeProps sourceReplica : sourceReplicas) {
       @SuppressWarnings({"rawtypes"}) NamedList nl = new NamedList();
       String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
-      }
+
+      log.debug("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+
       String targetNode = target;
       if (targetNode == null) {
         Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
@@ -146,7 +146,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       runners.add(runner);
     }
     String collection = clusterState.getCollectionStates().keySet().iterator().next();
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false);
     ocmh.overseer.writePendingUpdates(collection);
 
     CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 793ac18..b19b3fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -349,7 +349,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 //        firstReplicaFutures.add(future);
       }
 
-      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
+      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false);
       ocmh.overseer.writePendingUpdates(collectionName);
 
       log.info("Clusterstate after adding new shard for split {}", clusterState);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index a16683a..cd80b5d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Matcher;
 
-import org.apache.solr.cloud.ActionThrottle;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.cloud.api.collections.Assign;
@@ -44,7 +43,6 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -66,12 +64,8 @@ public class ZkStateWriter {
 
   protected volatile Stats stats;
 
-  private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>(128, 0.75f, 16);
-
   private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
 
-  Map<String,DocCollection> failedUpdates = new ConcurrentHashMap<>();
-
   Map<Long,String> idToCollection = new ConcurrentHashMap<>(128, 0.75f, 16);
 
   private Map<String,DocAssign> assignMap = new ConcurrentHashMap<>(128, 0.75f, 16);
@@ -82,10 +76,8 @@ public class ZkStateWriter {
 
   private static class ColState {
     ReentrantLock collLock = new ReentrantLock(true);
-    ActionThrottle throttle = new ActionThrottle("ZkStateWriter", Integer.getInteger("solr.zkstatewriter.throttle", 0), new TimeSource.NanoTimeSource());
   }
 
-
   private AtomicLong ID = new AtomicLong();
 
   private Set<String> dirtyStructure = ConcurrentHashMap.newKeySet();
@@ -98,227 +90,206 @@ public class ZkStateWriter {
 
   }
 
-  public Future enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
-    return ParWork.getRootSharedExecutor().submit(() -> {
-
-      try {
-        if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={} docCollection={} cs={}", stateUpdate, docCollection, cs);
-        if (!stateUpdate) {
-
-          String collectionName = docCollection.getName();
-
-          ColState collState = collLocks.compute(collectionName, (s, colState) -> {
-            if (colState == null) {
-              ColState cState = new ColState();
-              return cState;
-            }
-            return colState;
-          });
-          collState.collLock.lock();
-
-          try {
+  public void enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
 
-            DocCollection currentCollection = cs.get(docCollection.getName());
-            log.debug("zkwriter collection={}", docCollection);
-            log.debug("zkwriter currentCollection={}", currentCollection);
-
-            idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+    try {
 
-            //          if (currentCollection != null) {
-            //            if (currentCollection.getId() != collection.getId()) {
-            //              removeCollection(collection.getName());
-            //            }
-            //          }
+      if (!stateUpdate) {
+        if (log.isDebugEnabled()) log.debug("enqueue structure change docCollection={}", docCollection);
 
-            if (currentCollection != null) {
+        String collectionName = docCollection.getName();
+        ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+          if (colState == null) {
+            ColState cState = new ColState();
+            return cState;
+          }
+          return colState;
+        });
+        collState.collLock.lock();
+        try {
 
-              currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
-              List<String> removeSlices = new ArrayList();
-              for (Slice slice : docCollection) {
-                Slice currentSlice = currentCollection.getSlice(slice.getName());
-                if (currentSlice != null) {
-                  if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
-                    removeSlices.add(slice.getName());
-                  } else {
-                    currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
-                  }
+          DocCollection currentCollection = cs.get(docCollection.getName());
+          log.debug("zkwriter collection={}", docCollection);
+          log.debug("zkwriter currentCollection={}", currentCollection);
+          dirtyStructure.add(docCollection.getName());
+          idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+
+          if (currentCollection != null) {
+            docCollection.setZnodeVersion(currentCollection.getZNodeVersion());
+            currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
+            List<String> removeSlices = new ArrayList();
+            for (Slice slice : docCollection) {
+              Slice currentSlice = currentCollection.getSlice(slice.getName());
+              if (currentSlice != null) {
+                if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
+                  removeSlices.add(slice.getName());
                 } else {
-                  if (slice.getProperties().get("remove") != null) {
-                    continue;
-                  }
-                  Set<String> remove = new HashSet<>();
+                  currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
+                }
+              } else {
+                if (slice.getProperties().get("remove") != null) {
+                  continue;
+                }
+                Set<String> remove = new HashSet<>();
 
-                  for (Replica replica : slice) {
+                for (Replica replica : slice) {
 
-                    if (replica.get("remove") != null) {
-                      remove.add(replica.getName());
-                    }
-                  }
-                  for (String removeReplica : remove) {
-                    slice.getReplicasMap().remove(removeReplica);
+                  if (replica.get("remove") != null) {
+                    remove.add(replica.getName());
                   }
-                  currentCollection.getSlicesMap().put(slice.getName(), slice);
                 }
-              }
-              for (String removeSlice : removeSlices) {
-                currentCollection.getSlicesMap().remove(removeSlice);
-              }
-              cs.put(currentCollection.getName(), currentCollection);
-
-            } else {
-              docCollection.getProperties().remove("pullReplicas");
-              docCollection.getProperties().remove("replicationFactor");
-              docCollection.getProperties().remove("maxShardsPerNode");
-              docCollection.getProperties().remove("nrtReplicas");
-              docCollection.getProperties().remove("tlogReplicas");
-              List<String> removeSlices = new ArrayList();
-              for (Slice slice : docCollection) {
-                Slice currentSlice = docCollection.getSlice(slice.getName());
-                if (currentSlice != null) {
-                  if (slice.getProperties().get("remove") != null) {
-                    removeSlices.add(slice.getName());
-                  }
+                for (String removeReplica : remove) {
+                  slice.getReplicasMap().remove(removeReplica);
                 }
+                currentCollection.getSlicesMap().put(slice.getName(), slice);
               }
-              for (String removeSlice : removeSlices) {
-                docCollection.getSlicesMap().remove(removeSlice);
+            }
+            for (String removeSlice : removeSlices) {
+              currentCollection.getSlicesMap().remove(removeSlice);
+            }
+            cs.put(currentCollection.getName(), currentCollection);
+
+          } else {
+            docCollection.getProperties().remove("pullReplicas");
+            docCollection.getProperties().remove("replicationFactor");
+            docCollection.getProperties().remove("maxShardsPerNode");
+            docCollection.getProperties().remove("nrtReplicas");
+            docCollection.getProperties().remove("tlogReplicas");
+            List<String> removeSlices = new ArrayList();
+            for (Slice slice : docCollection) {
+              Slice currentSlice = docCollection.getSlice(slice.getName());
+              if (currentSlice != null) {
+                if (slice.getProperties().get("remove") != null) {
+                  removeSlices.add(slice.getName());
+                }
               }
-
-              cs.put(docCollection.getName(), docCollection);
+            }
+            for (String removeSlice : removeSlices) {
+              docCollection.getSlicesMap().remove(removeSlice);
             }
 
-            dirtyStructure.add(collectionName);
-
-          } finally {
-            collState.collLock.unlock();
+            cs.put(docCollection.getName(), docCollection);
           }
-        } else {
 
-          for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
-
-            ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
-              if (reentrantLock == null) {
-                ColState colState = new ColState();
-                return colState;
-              }
-              return reentrantLock;
-            });
+        } finally {
+          collState.collLock.unlock();
+        }
+      } else {
+        if (log.isDebugEnabled()) log.debug("enqueue state change states={}", collStateUpdates);
+        for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+
+          ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
+            if (reentrantLock == null) {
+              ColState colState = new ColState();
+              return colState;
+            }
+            return reentrantLock;
+          });
 
-            collState.collLock.lock();
-            try {
-              String collectionId = entry.getKey();
-              String collection = idToCollection.get(Long.parseLong(collectionId));
-              if (collection == null) {
-                log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
-                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
-              }
+          collState.collLock.lock();
+          try {
+            String collectionId = entry.getKey();
+            String collection = idToCollection.get(Long.parseLong(collectionId));
+            if (collection == null) {
+              log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
+            }
 
-              ZkNodeProps updates = stateUpdates.get(collection);
-              if (updates == null) {
-                updates = new ZkNodeProps();
-                stateUpdates.put(collection, updates);
-              }
+            ZkNodeProps updates = stateUpdates.get(collection);
+            if (updates == null) {
+              updates = new ZkNodeProps();
+              stateUpdates.put(collection, updates);
+            }
 
+            DocCollection docColl = cs.get(collection);
+            String csVersion;
+            if (docColl != null) {
+              csVersion = Integer.toString(docColl.getZNodeVersion());
               for (StateUpdate state : entry.getValue()) {
-
-                Integer ver = trackVersions.get(collection);
-                if (ver == null) {
-                  ver = 0;
+                if (state.sliceState != null) {
+                  Slice slice = docColl.getSlice(state.sliceName);
+                  if (slice != null) {
+                    slice.setState(Slice.State.getState(state.sliceState));
+                  }
+                  dirtyStructure.add(collection);
+                  continue;
                 }
-                updates.getProperties().put("_cs_ver_", ver.toString());
-
-                log.debug("version for state updates {}", ver.toString());
 
-                DocCollection docColl = cs.get(collection);
-                if (docColl != null) {
-
-                  if (state.sliceState != null) {
-                    Slice slice = docColl.getSlice(state.sliceName);
-                    if (slice != null) {
-                      slice.setState(Slice.State.getState(state.sliceState));
+                Replica replica = docColl.getReplicaById(state.id);
+                log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+                if (replica != null) {
+                  String setState = Replica.State.shortStateToState(state.state).toString();
+                  log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
+                  if (setState.equals("leader")) {
+                    if (log.isDebugEnabled()) {
+                      log.debug("set leader {}", replica);
                     }
-                    dirtyStructure.add(docColl.getName());
-                    continue;
-                  }
-
-                  Replica replica = docColl.getReplicaById(state.id);
-                  log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
-                  if (replica != null) {
-                    String setState = Replica.State.shortStateToState(state.state).toString();
-                    //                        if (blockedNodes.contains(replica.getNodeName())) {
-                    //                          continue;
-                    //                        }
-                    log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
-                    if (setState.equals("leader")) {
-                      if (log.isDebugEnabled()) {
-                        log.debug("set leader {}", replica);
-                      }
-                      Slice slice = docColl.getSlice(replica.getSlice());
-                      slice.setLeader(replica);
-                      replica.setState(Replica.State.ACTIVE);
-                      replica.getProperties().put("leader", "true");
-                      Collection<Replica> replicas = slice.getReplicas();
-                      for (Replica r : replicas) {
-                        if (r != replica) {
-                          r.getProperties().remove("leader");
-                        }
-                      }
-                      updates.getProperties().put(replica.getInternalId(), "l");
-                      dirtyState.add(collection);
-                    } else {
-                      Replica.State s = Replica.State.getState(setState);
-                      Replica existingLeader = docColl.getSlice(replica).getLeader();
-                      if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
-                        docColl.getSlice(replica).setLeader(null);
+                    Slice slice = docColl.getSlice(replica.getSlice());
+                    slice.setLeader(replica);
+                    replica.setState(Replica.State.ACTIVE);
+                    replica.getProperties().put("leader", "true");
+                    Collection<Replica> replicas = slice.getReplicas();
+                    for (Replica r : replicas) {
+                      if (r != replica) {
+                        r.getProperties().remove("leader");
                       }
-                      updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
-                      log.debug("set state {} {}", state, replica);
-                      replica.setState(s);
-                      dirtyState.add(collection);
                     }
-                  } else {
-                    log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
-                  }
-                } else {
-                  log.debug("Could not find existing collection name={}", collection);
-                  String setState = Replica.State.shortStateToState(state.state).toString();
-                  if (setState.equals("leader")) {
-                    updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+                    updates.getProperties().put(replica.getInternalId(), "l");
                     dirtyState.add(collection);
                   } else {
                     Replica.State s = Replica.State.getState(setState);
-                    updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+                    Replica existingLeader = docColl.getSlice(replica).getLeader();
+                    if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
+                      docColl.getSlice(replica).setLeader(null);
+                    }
+                    updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
+                    log.debug("set state {} {}", state, replica);
+                    replica.setState(s);
                     dirtyState.add(collection);
                   }
+                } else {
+                  log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
                 }
               }
-
-              String coll = entry.getKey();
-              dirtyState.add(coll);
-              Integer ver = trackVersions.get(coll);
-              if (ver == null) {
-                ver = 0;
-              }
-              updates.getProperties().put("_cs_ver_", ver.toString());
-              for (StateUpdate theUpdate : entry.getValue()) {
-                updates.getProperties().put(theUpdate.id.substring(theUpdate.id.indexOf("-") + 1), theUpdate.state);
+            } else {
+              for (StateUpdate state : entry.getValue()) {
+                log.warn("Could not find existing collection name={}", collection);
+//                String setState = Replica.State.shortStateToState(state.state).toString();
+//                if (setState.equals("leader")) {
+//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+//                  dirtyState.add(collection);
+//                } else {
+//                  Replica.State s = Replica.State.getState(setState);
+//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+//                  dirtyState.add(collection);
+//                }
               }
+              log.debug("version for state updates 0");
+              csVersion = "0";
+            }
 
-            } finally {
-              collState.collLock.unlock();
+            if (dirtyState.contains(collection)) {
+              updates.getProperties().put("_cs_ver_", csVersion);
             }
+
+          } finally {
+            collState.collLock.unlock();
           }
         }
-        
-      } catch (Exception e) {
-        log.error("Exception while queuing update", e);
-        throw e;
       }
-    });
+
+    } catch (Exception e) {
+      log.error("Exception while queuing update", e);
+      throw e;
+    }
   }
 
   public Integer lastWrittenVersion(String collection) {
-    return trackVersions.get(collection);
+    DocCollection col = cs.get(collection);
+    if (col == null) {
+      return 0;
+    }
+    return col.getZNodeVersion();
   }
 
   /**
@@ -326,31 +297,29 @@ public class ZkStateWriter {
    *
    */
 
-  // if additional updates too large, publish structure change
-  public void writePendingUpdates(String collection) {
-
-    do {
-      try {
-        write(collection);
-        break;
-      } catch (KeeperException.BadVersionException e) {
-
-      } catch (Exception e) {
-        log.error("write pending failed", e);
-        break;
-      }
+  public Future writePendingUpdates(String collection) {
+    return ParWork.getRootSharedExecutor().submit(() -> {
+      do {
+        try {
+          write(collection);
+          break;
+        } catch (KeeperException.BadVersionException e) {
 
-    } while (!overseer.isClosed());
+        } catch (Exception e) {
+          log.error("write pending failed", e);
+          break;
+        }
 
+      } while (!overseer.isClosed() && !overseer.getZkStateReader().getZkClient().isClosed());
+    });
   }
 
   private void write(String coll) throws KeeperException.BadVersionException {
 
     if (log.isDebugEnabled()) {
-      log.debug("writePendingUpdates {}", cs);
+      log.debug("writePendingUpdates {}", coll);
     }
 
-    AtomicInteger lastVersion = new AtomicInteger();
     AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
 
     DocCollection collection = cs.get(coll);
@@ -359,133 +328,104 @@ public class ZkStateWriter {
       return;
     }
 
-    if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
-    Integer version = null;
-    if (dirtyStructure.contains(collection.getName())) {
-      log.info("process collection {}", collection);
-      ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
-        if (reentrantLock == null) {
-          ColState colState = new ColState();
-          return colState;
-        }
-        return reentrantLock;
-      });
-
-      collState.collLock.lock();
-      try {
-        collState.throttle.minimumWaitBetweenActions();
-        collState.throttle.markAttemptingAction();
-        String name = collection.getName();
-        String path = ZkStateReader.getCollectionPath(collection.getName());
-        String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
-        // log.info("process collection {} path {}", collection.getName(), path);
-        Stat existsStat = null;
-        if (log.isTraceEnabled()) log.trace("process {}", collection);
-        try {
-          // log.info("get data for {}", name);
-
-          //  log.info("got data for {} {}", name, data.length);
+    log.debug("process collection {}", coll);
+    ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
+      if (reentrantLock == null) {
+        ColState colState = new ColState();
+        return colState;
+      }
+      return reentrantLock;
+    });
+    collState.collLock.lock();
+    try {
+      collection = cs.get(coll);
 
-          try {
+      if (collection == null) {
+        return;
+      }
 
-            if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
-            byte[] data = Utils.toJSON(singletonMap(name, collection));
-            Integer v = trackVersions.get(collection.getName());
+      if (log.isTraceEnabled()) log.trace("check collection {} {} {}", collection, dirtyStructure, dirtyState);
 
-            if (v != null) {
-              //log.info("got version from cache {}", v);
-              version = v;
-            } else {
-              version = 0;
-            }
-            lastVersion.set(version);
-            if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+      //  collState.throttle.minimumWaitBetweenActions();
+      //  collState.throttle.markAttemptingAction();
+      String name = collection.getName();
+      String path = ZkStateReader.getCollectionPath(collection.getName());
+      String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
 
-            reader.getZkClient().setData(path, data, version, true, false);
-            if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+      if (log.isTraceEnabled()) log.trace("process {}", collection);
+      try {
 
-            reader.getZkClient().setData(pathSCN, null, -1, true, false);
+        if (dirtyStructure.contains(name)) {
+          if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
 
-            dirtyStructure.remove(collection.getName());
+          byte[] data = Utils.toJSON(singletonMap(name, collection));
 
-            ZkNodeProps updates = stateUpdates.get(collection.getName());
-            if (updates != null) {
-              updates.getProperties().clear();
-              //(collection, updates);
-              //dirtyState.remove(collection.getName());
-            }
+          if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
 
-            trackVersions.put(collection.getName(), version + 1);
+          Integer finalVersion = collection.getZNodeVersion();
+          dirtyStructure.remove(collection.getName());
+          if (reader == null) {
+            log.error("read not initialized in zkstatewriter");
+          }
+          if (reader.getZkClient() == null) {
+            log.error("zkclient not initialized in zkstatewriter");
+          }
 
+          Stat stat;
+          try {
+            stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
+            collection.setZnodeVersion(finalVersion + 1);
+            if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
           } catch (KeeperException.NoNodeException e) {
-            if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
-            lastVersion.set(-1);
-            trackVersions.remove(collection.getName());
-            stateUpdates.remove(collection.getName());
-            cs.remove(collection);
-            // likely deleted
+            log.debug("No node found for state.json", e);
 
           } catch (KeeperException.BadVersionException bve) {
-            log.info("Tried to update state.json ({}) with bad version", collection);
-            //lastFailedException.set(bve);
-            //failedUpdates.put(collection.getName(), collection);
-            // Stat estate = reader.getZkClient().exists(path, null);
-            trackVersions.remove(collection.getName());
-            Stat stat = reader.getZkClient().exists(path, null, false, false);
-            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
-            if (!overseer.isClosed() && stat != null) {
-              trackVersions.put(collection.getName(), stat.getVersion());
-            }
+            stat = reader.getZkClient().exists(path, null, false, false);
+            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, finalVersion, stat != null ? stat.getVersion() : "null");
+
             throw bve;
           }
 
-        } catch (KeeperException.BadVersionException bve) {
-          badVersionException.set(bve);
-        } catch (InterruptedException | AlreadyClosedException e) {
-          log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+          reader.getZkClient().setData(pathSCN, null, -1, true, false);
 
-        } catch (Exception e) {
-          log.error("Failed processing update=" + collection, e);
-        }
-      } finally {
-        collState.collLock.unlock();
-      }
-    }
+          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          if (updates != null) {
+            updates.getProperties().clear();
+          }
 
-    if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
-      ZkNodeProps updates = stateUpdates.get(collection.getName());
-      if (updates != null) {
-        try {
-          writeStateUpdates(collection, updates);
-        } catch (Exception e) {
-          log.error("exception writing state updates", e);
+        } else if (dirtyState.contains(collection.getName())) {
+          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          if (updates != null) {
+            try {
+              writeStateUpdates(collection, updates);
+            } catch (Exception e) {
+              log.error("exception writing state updates", e);
+            }
+          }
         }
-      }
-    }
 
-    //removeCollections.forEach(c ->  removeCollection(c));
+      } catch (KeeperException.BadVersionException bve) {
+        badVersionException.set(bve);
+      } catch (InterruptedException | AlreadyClosedException e) {
+        log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+        throw new AlreadyClosedException(e);
 
-    if (badVersionException.get() != null) {
-      throw badVersionException.get();
-    }
-
-    //log.info("Done with successful cluster write out");
+      } catch (Exception e) {
+        log.error("Failed processing update=" + collection, e);
+      }
 
-    //    } finally {
-    //      writeLock.unlock();
-    //    }
-    // MRM TODO: - harden against failures and exceptions
+      if (badVersionException.get() != null) {
+        throw badVersionException.get();
+      }
 
-    //    if (log.isDebugEnabled()) {
-    //      log.debug("writePendingUpdates() - end - New Cluster State is: {}", newClusterState);
-    //    }
+    } finally {
+      collState.collLock.unlock();
+    }
   }
 
   private void writeStateUpdates(DocCollection collection, ZkNodeProps updates) throws KeeperException, InterruptedException {
     String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
-    if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
+    if (log.isDebugEnabled()) log.debug("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
     try {
       reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
     } catch (KeeperException.NoNodeException e) {
@@ -519,14 +459,6 @@ public class ZkStateWriter {
     });
     collState.collLock.lock();
     try {
-      stateUpdates.remove(collection);
-      cs.remove(collection);
-      assignMap.remove(collection);
-      trackVersions.remove(collection);
-      dirtyStructure.remove(collection);
-      dirtyState.remove(collection);
-      ZkNodeProps message = new ZkNodeProps("name", collection);
-      cs.remove(collection);
       Long id = null;
       for (Map.Entry<Long, String> entry : idToCollection.entrySet()) {
         if (entry.getValue().equals(collection)) {
@@ -536,6 +468,12 @@ public class ZkStateWriter {
       }
       if (id != null) {
         idToCollection.remove(id);
+        stateUpdates.remove(collection);
+        cs.remove(collection);
+        assignMap.remove(collection);
+        dirtyStructure.remove(collection);
+        dirtyState.remove(collection);
+        cs.remove(collection);
       }
     } catch (Exception e) {
       log.error("", e);
@@ -557,17 +495,17 @@ public class ZkStateWriter {
 
 
       int id = docAssign.replicaAssignCnt.incrementAndGet();
-      log.info("assign id={} for collection={} slice={}", id, collection, shard);
+      log.debug("assign id={} for collection={} slice={}", id, collection, shard);
       return id;
     }
 
     int id = docAssign.replicaAssignCnt.incrementAndGet();
-    log.info("assign id={} for collection={} slice={}", id, collection, shard);
+    log.debug("assign id={} for collection={} slice={}", id, collection, shard);
     return id;
   }
 
   public void init() {
-    reader.forciblyRefreshAllClusterStateSlow();
+
     ClusterState readerState = reader.getClusterState();
     if (readerState != null) {
       cs.putAll(readerState.copy().getCollectionsMap());
@@ -575,31 +513,44 @@ public class ZkStateWriter {
 
     long[] highId = new long[1];
     cs.values().forEach(collection -> {
-      if (collection.getId() > highId[0]) {
-        highId[0] = collection.getId();
-      }
+      String collectionName = collection.getName();
+      ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+        if (colState == null) {
+          ColState cState = new ColState();
+          return cState;
+        }
+        return colState;
+      });
+      collState.collLock.lock();
+      try {
 
-      idToCollection.put(collection.getId(), collection.getName());
+        if (collection.getId() > highId[0]) {
+          highId[0] = collection.getId();
+        }
 
-      trackVersions.put(collection.getName(), collection.getZNodeVersion());
+        idToCollection.put(collection.getId(), collection.getName());
 
-      DocAssign docAssign = new DocAssign();
-      docAssign.name = collection.getName();
-      assignMap.put(docAssign.name, docAssign);
-      int max = 1;
-      Collection<Slice> slices = collection.getSlices();
-      for (Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-
-        for (Replica replica : replicas) {
-          Matcher matcher = Assign.pattern.matcher(replica.getName());
-          if (matcher.matches()) {
-            int val = Integer.parseInt(matcher.group(1));
-            max = Math.max(max, val);
+
+        DocAssign docAssign = new DocAssign();
+        docAssign.name = collection.getName();
+        assignMap.put(docAssign.name, docAssign);
+        int max = 1;
+        Collection<Slice> slices = collection.getSlices();
+        for (Slice slice : slices) {
+          Collection<Replica> replicas = slice.getReplicas();
+
+          for (Replica replica : replicas) {
+            Matcher matcher = Assign.pattern.matcher(replica.getName());
+            if (matcher.matches()) {
+              int val = Integer.parseInt(matcher.group(1));
+              max = Math.max(max, val);
+            }
           }
         }
+        docAssign.replicaAssignCnt.set(max);
+      } finally {
+        collState.collLock.unlock();
       }
-      docAssign.replicaAssignCnt.set(max);
     });
 
     ID.set(highId[0]);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5cc13bb..6775b88 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -156,6 +156,7 @@ public class CoreContainer implements Closeable {
   final SolrCores solrCores = new SolrCores(this);
   private volatile boolean startedLoadingCores;
   private volatile boolean loaded;
+  private volatile SolrClient metricsHistoryHandlerCloudClient;
 
   public static class CoreLoadFailure {
 
@@ -877,7 +878,7 @@ public class CoreContainer implements Closeable {
 
             List<Replica> replicas = c.getReplicas();
             for (Replica replica : replicas) {
-              log.debug("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
+              log.trace("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
               if (replica.getNodeName().equals(nodeName)) {
                 if (replica.getState().equals(State.ACTIVE)) {
                   if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={} replica={}", replica.getState(), replica.getNodeName(), nodeName, replica);
@@ -998,20 +999,20 @@ public class CoreContainer implements Closeable {
     }
     String name;
     SolrCloudManager cloudManager;
-    SolrClient client;
+
     if (isZooKeeperAware()) {
       name = getZkController().getNodeName();
       cloudManager = getZkController().getSolrCloudManager();
-      client = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
+      metricsHistoryHandlerCloudClient = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
           .withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().build();
-      ((CloudHttp2SolrClient)client).connect();
+      ((CloudHttp2SolrClient) metricsHistoryHandlerCloudClient).connect();
     } else {
       name = getNodeConfig().getNodeName();
       if (name == null || name.isEmpty()) {
         name = "127.0.0.1";
       }
       cloudManager = null;
-      client = new EmbeddedSolrServer();
+      metricsHistoryHandlerCloudClient = new EmbeddedSolrServer();
       // enable local metrics unless specifically set otherwise
       if (!initArgs.containsKey(MetricsHistoryHandler.ENABLE_NODES_PROP)) {
         initArgs.put(MetricsHistoryHandler.ENABLE_NODES_PROP, true);
@@ -1020,8 +1021,7 @@ public class CoreContainer implements Closeable {
         initArgs.put(MetricsHistoryHandler.ENABLE_REPLICAS_PROP, true);
       }
     }
-    metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler,
-        client, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
+    metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler, metricsHistoryHandlerCloudClient, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
     containerHandlers.put(METRICS_HISTORY_PATH, metricsHistoryHandler);
     metricsHistoryHandler.initializeMetrics(solrMetricsContext, METRICS_HISTORY_PATH);
   }
@@ -1169,7 +1169,7 @@ public class CoreContainer implements Closeable {
       if (auditloggerPlugin != null) {
         auditPlugin = auditloggerPlugin.plugin;
       }
-
+      closer.collect(metricsHistoryHandlerCloudClient);
       closer.collect(authPlugin);
       closer.collect(authenPlugin);
       closer.collect(auditPlugin);
@@ -1410,7 +1410,7 @@ public class CoreContainer implements Closeable {
   @SuppressWarnings("resource")
   private SolrCore createFromDescriptor(CoreDescriptor dcore, boolean newCollection) {
 
-    log.info("createFromDescriptor {} {}", dcore, newCollection);
+    log.debug("createFromDescriptor {} {}", dcore, newCollection);
 
     if (isShutDown()) {
       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
@@ -1421,10 +1421,15 @@ public class CoreContainer implements Closeable {
     boolean registered = false;
     try {
       MDCLoggingContext.setCoreName(dcore.getName());
+
       StopWatch timeValidateCoreNameLoadConfigSet = new StopWatch(dcore.getName() + "-validateCoreNameLoadConfigSet");
 
       SolrIdentifierValidator.validateCoreName(dcore.getName());
 
+      if (isZooKeeperAware()) {
+        getZkController().getZkStateReader().registerCore(dcore.getCollectionName(), dcore.getName());
+      }
+
       ConfigSet coreConfig = coreConfigService.loadConfigSet(dcore);
       dcore.setConfigSetTrusted(coreConfig.isTrusted());
       if (log.isInfoEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 17c4351..f380a0f 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1769,8 +1769,11 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       }
     }
 
-    void doClose () {
-
+    synchronized void doClose () {
+      if (refCount.get() == -1) {
+        log.warn("SolrCore is already closed {}", name);
+        return;
+      }
       try {
         if (closing) {
           this.closing = true;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6a8e1fa..5f9f90e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1077,7 +1077,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX);
       if (m.isEmpty()) {
         throw new SolrException(ErrorCode.BAD_REQUEST,
-            formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
+            formatString("no supported values provided {0} {1}", req.getParams(), CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
       }
       copy(req.getParams().required(), m, COLLECTION_PROP);
       for (Map.Entry<String, Object> entry : m.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
index 05f81bf..f4cf0b8 100644
--- a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
+++ b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
@@ -222,7 +222,7 @@ public abstract class ManagedResourceStorage {
         throw new SolrException(ErrorCode.SERVER_ERROR, errMsg, exc);
       }
       
-      log.info("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
+      log.debug("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
     }    
     
     @Override
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index d912d62..3943f82 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -101,7 +101,7 @@ public final class CommitTracker implements Runnable, Closeable {
     scheduler.setRemoveOnCancelPolicy(true);
     scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-    log.info("{} AutoCommit: {}", name, this);
+    log.debug("{} AutoCommit: {}", name, this);
     assert ObjectReleaseTracker.track(this);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index e17ef6a..a0c772f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -144,9 +144,8 @@ UpdateHandler implements SolrInfoBean, Closeable {
         ulog.clearLog(core, ulogPluginInfo);
       }
 
-      if (log.isInfoEnabled()) {
-        log.info("Using UpdateLog implementation: {}", ulog.getClass().getName());
-      }
+      log.debug("Using UpdateLog implementation: {}", ulog.getClass().getName());
+
       ulog.init(ulogPluginInfo);
       ulog.init(this, core);
     } else {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index dfb3679..b9c6461 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -391,7 +391,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
           "Number of version buckets must be greater than 0!");
 
-    log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
+    log.debug("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
         dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep, numVersionBuckets);
   }
 
@@ -2386,7 +2386,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
       if (maxVersion == 0L) {
         maxVersion = versions.getNewClock();
-        log.info("Could not find max version in index or recent updates, using new clock {}", maxVersion);
+        log.debug("Could not find max version in index or recent updates, using new clock {}", maxVersion);
       }
 
       // seed all version buckets with the highest value from recent and index
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 61a785d..4f9e619 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -277,7 +277,7 @@ public class VersionInfo {
 
     final String versionFieldName = versionField.getName();
 
-    log.info("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
+    log.debug("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
     // if indexed, then we have terms to get the max from
     if (versionField.indexed()) {
       if (versionField.getType().isPointField()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 620b890..bfd55b3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
 
   @Test
   public void start() throws Exception {
-    int collectionCnt = 40;
+    int collectionCnt = 5;
     List<Future> futures = new ArrayList<>();
     List<Future> indexFutures = new ArrayList<>();
     for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 91fd066..b88e93f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -51,9 +51,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
 //@LuceneTestCase.Nightly
 public class TestCollectionAPI extends ReplicaPropertiesBase {
 
@@ -909,6 +910,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
       params.set("router.name", "implicit");
       params.set("numShards", "1");
       params.set("shards", "invalid@name#with$weird%characters");
+      params.set(WAIT_FOR_FINAL_STATE, "true");
       SolrRequest request = new QueryRequest(params);
       request.setPath("/admin/collections");
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 86c7995..b70b08d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -379,7 +379,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
       String url = zkClientClusterStateProvider.getZkStateReader().getBaseUrlForNodeName(solrNode);
 
       try {
-        GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
+        GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.POST, path, params); // params too long for GET
         try (Http2SolrClient client = new Http2SolrClient.Builder().withHttpClient(httpClient).withBaseUrl(url).markInternalRequest().build()) {
           NamedList<Object> rsp = client.request(request);
           request.response.nl = rsp;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index dc36207..d7d41d4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -215,6 +215,20 @@ public class ClusterState implements JSONWriter.Writable {
     return createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
   }
 
+  public static DocCollection createDocCollectionFromJson(Replica.NodeNameToBaseUrl nodeNameToBaseUrl, int version, byte[] bytes) {
+    if (bytes == null || bytes.length == 0) {
+      return null;
+    }
+    Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes);
+    ClusterState cs = createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
+    if (cs.getCollectionsMap().size() == 0) {
+      return null;
+    }
+    DocCollection docCollection = cs.getCollectionsMap().values().iterator().next();
+    docCollection.setZnodeVersion(version);
+    return docCollection;
+  }
+
   public static ClusterState createFromCollectionMap(Replica.NodeNameToBaseUrl zkStateReader, int version, Map<String, Object> stateMap) {
     Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 814f808..59cd767 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -93,7 +93,7 @@ public class ConnectionManager implements Watcher, Closeable {
       throw new IllegalStateException("You must call start on " + SolrZkClient.class.getName() + " before you can use it");
     }
 
-    if (keeper != null && isClosed && !keeper.getState().isAlive()) {
+    if (keeper != null && (isClosed || !keeper.getState().isAlive())) {
       throw new AlreadyClosedException(this + " SolrZkClient is not currently connected state=" + keeper.getState());
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 3c0326f..b55ce1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -51,7 +51,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
 
-  private final int znodeVersion;
+  private int znodeVersion;
 
   private final String name;
   private final Map<String, Slice> slices;
@@ -462,4 +462,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public Map getStateUpdates() {
     return stateUpdates;
   }
+
+  public void setZnodeVersion(int version) {
+    this.znodeVersion = version;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 80eea86..2a48875 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -426,9 +426,11 @@ public class SolrZkClient implements Closeable {
     return setData(path, data, version, retryOnConnLoss, false);
   }
 
-  /**
-   * Returns node's state
-   */
+  public void setData(final String path, final byte data[], final int version, AsyncCallback.StatCallback cb, Object ctx)
+      throws KeeperException, InterruptedException {
+    connManager.getKeeper().setData(path, data, version, cb, ctx);
+  }
+
   public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss, boolean retryOnSessionExpiration)
       throws KeeperException, InterruptedException {
 
@@ -823,21 +825,13 @@ public class SolrZkClient implements Closeable {
     return dataMap;
   }
 
-  public void delete(Collection<String> paths, boolean wait) throws KeeperException {
+  public CountDownLatch delete(Collection<String> paths, boolean wait) throws KeeperException {
     if (log.isDebugEnabled()) log.debug("delete paths {} wait={}", paths, wait);
-    if (paths.size() == 0) {
-      return;
-    }
-    CountDownLatch latch = null;
-    if (wait) {
-      latch = new CountDownLatch(paths.size());
-    }
+    CountDownLatch latch = new CountDownLatch(paths.size());
+
     KeeperException[] ke = new KeeperException[1];
     for (String path : paths) {
       if (log.isDebugEnabled()) log.debug("process path={} connManager={}", path, connManager);
-  
-
-      CountDownLatch finalLatch = latch;
 
       connManager.getKeeper().delete(path, -1, (rc, path1, ctx) -> {
         try {
@@ -855,9 +849,7 @@ public class SolrZkClient implements Closeable {
             }
           }
         } finally {
-          if (wait) {
-            finalLatch.countDown();
-          }
+          latch.countDown();
         }
       }, null);
 
@@ -867,43 +859,26 @@ public class SolrZkClient implements Closeable {
       log.debug("done with all paths, see if wait ... wait={}", wait);
     }
     if (wait) {
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-      boolean success = false;
-      while (!timeout.hasTimedOut() && !isClosed) {
-        if (!connManager.getKeeper().getState().isConnected()) {
-          try {
-            connManager.waitForConnected(30000);
-          } catch (TimeoutException e) {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-          } catch (InterruptedException e) {
-            ParWork.propagateInterrupt(e);
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-          }
-        }
+      try {
+        boolean success = latch.await(10, TimeUnit.SECONDS);
 
-        if (ke[0] != null) {
-          throw ke[0];
-        }
-        try {
-          success = latch.await(10, TimeUnit.SECONDS);
-          if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
-          if (success) {
-            break;
+        if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+        if (success) {
+          if (ke[0] != null) {
+            throw ke[0];
           }
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          log.error("", e);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
         }
+      } catch (InterruptedException e) {
+        ParWork.propagateInterrupt(e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
     }
 
-    if (ke[0] != null) {
-      throw ke[0];
-    }
     if (log.isDebugEnabled()) {
       log.debug("done with delete {} {}", paths, wait);
     }
+
+    return latch;
   }
 
   // Calls setData for a list of existing paths in parallel
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 12d2671..8290360 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -260,7 +260,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     public boolean canBeRemoved() {
       int refCount = coreRefCount.get();
       int watcherCount = stateWatchers.size();
-      log.debug("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
+      log.trace("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
       return refCount <= 0 && watcherCount <= 0;
     }
 
@@ -449,7 +449,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (log.isDebugEnabled()) {
         log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
       }
-      DocCollection nu = getCollectionLive(this, coll);
+      DocCollection nu = getCollectionLive(coll);
       if (nu == null) return -3;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
         if (updateWatchedCollection(coll, nu, false)) {
@@ -634,13 +634,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   private void constructState(DocCollection collection, String caller) {
 
-    if (log.isDebugEnabled()) log.debug("construct new cluster state on structure change {} {}", caller, collection);
+    if (log.isDebugEnabled()) log.trace("construct new cluster state on structure change {} {}", caller, collection);
 
 
-    if (log.isTraceEnabled()) {
-      log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
+
+    log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
           clusterState.keySet());
-    }
+
 //
 //    watchedCollectionStates.forEach((s, slices) -> {
 //      clusterState.putIfAbsent(s, new ClusterState.CollectionRef(slices));
@@ -689,8 +689,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           LazyCollectionRef docRef = new LazyCollectionRef(coll);
           LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
           if (old == null) {
-            log.debug("Created lazy collection {}", coll);
             clusterState.put(coll, docRef);
+
+            log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
+                watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
           }
         }
       }
@@ -752,7 +754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   private void notifyCloudCollectionsListeners(boolean notifyIfSame) {
-    if (log.isDebugEnabled()) log.debug("Notify cloud collection listeners {}", notifyIfSame);
+    log.trace("Notify cloud collection listeners {}", notifyIfSame);
     Set<String> newCollections;
     Set<String> oldCollections;
     boolean fire = true;
@@ -763,9 +765,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       fire = true;
     }
 
-    if (log.isDebugEnabled()) log.debug("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
+    log.trace("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
     if (fire) {
-
       cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(oldCollections, newCollections));
     }
   }
@@ -807,7 +808,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
         if (shouldFetch) {
           try {
-            DocCollection cdc = getCollectionLive(ZkStateReader.this, collName);
+            DocCollection cdc = getCollectionLive(collName);
             if (cdc != null) {
               cdc.setCreatedLazy();
               lastUpdateTime = System.nanoTime();
@@ -959,7 +960,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public Replica getLeader(String collection, String shard) {
-    return getLeader(getClusterState().getCollection(collection), shard);
+    return getLeader(getCollectionOrNull(collection), shard);
   }
 
   private Replica getLeader(DocCollection docCollection, String shard) {
@@ -1486,18 +1487,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     @Override
     public void close() throws IOException {
       this.closed = true;
-  //    IOUtils.closeQuietly(stateUpdateWatcher);
-//      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-//      if (zk != null) {
-//        try {
-//          zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
-//        } catch (KeeperException.NoWatcherException e) {
-//
-//        } catch (Exception e) {
-//          if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-//        }
-//      }
-//      IOUtils.closeQuietly(stateUpdateWatcher);
     }
 
     private class StateUpdateWatcher implements Watcher, Closeable {
@@ -1511,40 +1500,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       @Override
       public void close() throws IOException {
         this.closed = true;
-//        SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-//        if (zk != null) {
-//          if (stateUpdateWatcher != null) {
-//            try {
-//              zk.removeWatches(getCollectionStateUpdatesPath(coll), stateUpdateWatcher, WatcherType.Any, true);
-//            } catch (KeeperException.NoWatcherException e) {
-//
-//            } catch (Exception e) {
-//              if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-//            }
-//          }
-//        }
       }
 
       @Override
       public void process(WatchedEvent event) {
         if (zkClient.isClosed() || closed) return;
-        if (log.isDebugEnabled()) log.debug("_statupdates event {}", event);
+        log.trace("_statupdates event {}", event);
 
         try {
-
-          //            if (event.getType() == EventType.NodeDataChanged ||
-          //                event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
           collectionStateLock.lock();
           getAndProcessStateUpdates(coll, stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
-          //            }
-
         } catch (AlreadyClosedException e) {
 
         } catch (Exception e) {
           log.error("Unwatched collection: [{}]", coll, e);
         }
       }
-
     }
   }
 
@@ -1783,7 +1754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
   }
 
-  public DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+  public DocCollection getCollectionLive(String coll) {
     log.debug("getCollectionLive {}", coll);
     DocCollection newState;
     try {
@@ -1812,7 +1783,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, boolean live, DocCollection docCollection, ReentrantLock collectionStateLock) throws KeeperException, InterruptedException {
     DocCollection result = null;
     try {
-      log.debug("get and process state updates for {}", coll);
+      log.trace("get and process state updates for {}", coll);
 
       Stat stat;
       try {
@@ -1827,7 +1798,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       if (docCollection != null && docCollection.hasStateUpdates()) {
         int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
-        if (stat.getVersion() < oldVersion) {
+        if (stat.getVersion() <= oldVersion) {
           if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
           return docCollection;
         }
@@ -1854,7 +1825,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
 
       Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
-      if (log.isDebugEnabled()) log.debug("Got additional state updates with version {} {} cs={}", version, m, clusterState);
+      if (log.isDebugEnabled()) log.debug("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
 
       m.remove("_cs_ver_");
       m.put("_ver_", stat.getVersion());
@@ -1862,11 +1833,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         Set<Entry<String,Object>> entrySet = m.entrySet();
 
         if (docCollection != null) {
-          // || (version > docCollection.getZNodeVersion() && clusterState.getZkClusterStateVersion() == -1)) {
-//          if (!docCollection.hasStateUpdates() && version < docCollection.getZNodeVersion()) {
-//            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
-//            return docCollection;
-//          }
+          if (version < docCollection.getZNodeVersion()) {
+            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+            return docCollection;
+          }
 
           if (docCollection.hasStateUpdates()) {
             int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
@@ -1885,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             }
 
             Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
-            if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
+            if (log.isTraceEnabled()) log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
 
             if (replica != null) {
 
@@ -1902,7 +1872,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
                   if (replica.getName().equals(r.getName())) {
                     continue;
                   }
-                  log.debug("process non leader {} {}", r, r.getProperty(LEADER_PROP));
+                  log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
                   if ("true".equals(r.getProperties().get(LEADER_PROP))) {
                     log.debug("remove leader prop {}", r);
                     Map<String,Object> props = new HashMap<>(r.getProperties());
@@ -1912,7 +1882,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
                   }
                 }
               } else if (state != null && !properties.get(ZkStateReader.STATE_PROP).equals(state.toString())) {
-                if (log.isDebugEnabled()) log.debug("std state, set to {}", state);
+                log.trace("std state, set to {}", state);
                 properties.put(ZkStateReader.STATE_PROP, state.toString());
                 if ("true".equals(properties.get(LEADER_PROP))) {
                   properties.remove(LEADER_PROP);
@@ -1921,7 +1891,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
               Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
 
-              if (log.isDebugEnabled()) log.debug("add new replica {}", newReplica);
+              log.trace("add new replica {}", newReplica);
 
               replicasMap.put(replica.getName(), newReplica);
 
@@ -1933,7 +1903,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
               Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
               newSlices.put(slice.getName(), newSlice);
 
-              if (log.isDebugEnabled()) log.debug("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+              log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
 
               DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), m);
               docCollection = newDocCollection;
@@ -1957,7 +1927,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             //
             //            });
 
-            if (log.isDebugEnabled()) log.debug("Set a new clusterstate based on update diff {}", result);
+            log.trace("Set a new clusterstate based on update diff {}", result);
 
             updateWatchedCollection(coll, result, false);
             constructState(result);
@@ -2001,39 +1971,41 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
       }
 
-      if (lazyCollectionStates.containsKey(coll)) {
-        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
-        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
-        if (cachedCollection != null) {
-          int localVersion = cachedCollection.getZNodeVersion();
-          if (cachedCollection.hasStateUpdates()) {
-            if (localVersion == version) {
-              return cachedCollection;
-            }
-          } else {
-            if (localVersion == version) {
-              return cachedCollection;
-            }
-          }
-        }
-
-      }
+//      if (lazyCollectionStates.containsKey(coll)) {
+//        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+//        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+//        if (cachedCollection != null) {
+//          int localVersion = cachedCollection.getZNodeVersion();
+//          if (cachedCollection.hasStateUpdates()) {
+//            if (localVersion == version) {
+//              return cachedCollection;
+//            }
+//          } else {
+//            if (localVersion == version) {
+//              return cachedCollection;
+//            }
+//          }
+//        }
+//      }
 
     } else {
       return null;
     }
-    if (log.isDebugEnabled()) log.debug("getting latest state.json");
+    log.debug("getting latest state.json");
     Stat stat = new Stat();
     byte[] data;
     try {
       data = zkClient.getData(collectionPath, null, stat, true);
     } catch (NoNodeException e) {
+      log.debug("no state.json znode found");
       return null;
     }
-    if (data == null) return null;
-    ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
-    ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
-    DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+
+    if (data == null) {
+      log.debug("no data found at state.json node");
+      return null;
+    }
+    DocCollection docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
 
     return docCollection;
   }
@@ -2084,6 +2056,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
+        v.coreRefCount.incrementAndGet();
         return v;
       }
       v.coreRefCount.incrementAndGet();
@@ -2187,11 +2160,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
-        log.debug("creating new Collection State watcher for {}", collection);
+        log.debug("creating CollectionStateWatcher and refreshing for {}", collection);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
-        log.debug("creating watches and refreshing state watcher for {}", collection);
+
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
@@ -2214,7 +2187,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   }
 
-  private DocCollection getCollectionOrNull(String collection) {
+  public DocCollection getCollection(String collection) {
+    DocCollection coll = getCollectionOrNull(collection);
+    if (coll == null) throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection : " + collection + " collections=" + clusterState.keySet());
+    return coll;
+  }
+
+  public DocCollection getCollectionOrNull(String collection) {
     ClusterState.CollectionRef coll = clusterState.get(collection);
     if (coll == null) return null;
     return coll.get();
@@ -2275,11 +2254,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
-    log.info("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
+    log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
         clusterState.size());
 
-    log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
-        clusterState.keySet());
     assert collection != null;
     CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
 
@@ -2378,7 +2355,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @see #registerDocCollectionWatcher
    */
   public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
-    if (log.isDebugEnabled()) log.debug("remove watcher for collection {}", collection);
+    log.trace("remove watcher for collection {}", collection);
 
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
@@ -2388,18 +2365,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) return null;
-      v.stateWatchers.remove(watcher);
       if (v.canBeRemoved()) {
-        log.debug("no longer watch collection {}", collection);
+        log.trace("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
         clusterState.put(collection, docRef);
-//        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
-//        if (stateWatcher != null) {
-//          IOUtils.closeQuietly(stateWatcher);
-//          stateWatcher.removeWatch();
-//        }
+        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+        if (stateWatcher != null) {
+          stateWatcher.removeWatch();
+          IOUtils.closeQuietly(stateWatcher);
+        }
         reconstructState.set(true);
         return null;
       }
@@ -2529,7 +2505,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   private void notifyStateWatchers(String collection, DocCollection collectionState) {
-    if (log.isDebugEnabled()) log.debug("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
+    log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
 
     try {
       notifications.submit(new Notification(collection, collectionState, collectionWatches));
@@ -2572,7 +2548,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       try (ParWork work = new ParWork(this)) {
         watchers.forEach(watcher -> {
           work.collect("", () -> {
-            log.debug("Notify DocCollectionWatcher {} {}", watcher, collectionState);
+            log.trace("Notify DocCollectionWatcher {} {}", watcher, collectionState);
             try {
               if (watcher.onStateChanged(collectionState)) {
                 removeDocCollectionWatcher(collection, watcher);
@@ -2875,7 +2851,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         return false;
       Collection<Slice> activeSlices = collectionState.getActiveSlices();
 
-      log.debug("active slices expected={} {} {} allSlices={}",expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
+      log.trace("active slices expected={} {} {} allSlices={}", expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
 
       if (!exact) {
         if (activeSlices.size() < expectedShards) {
@@ -2888,16 +2864,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
 
       if (expectedReplicas == 0 && !exact) {
-        log.info("0 replicas expected and found, return");
+        log.debug("0 replicas expected and found, return");
         return true;
       }
 
       int activeReplicas = 0;
       for (Slice slice : activeSlices) {
         Replica leader = slice.getLeader();
-        log.info("slice is {} and leader is {}", slice.getName(), leader);
+        log.trace("slice is {} and leader is {}", slice.getName(), leader);
         if (leader == null) {
-          log.info("slice={}", slice);
+          log.debug("slice={}", slice);
           return false;
         }
         for (Replica replica : slice) {
@@ -2905,7 +2881,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             activeReplicas++;
           }
         }
-        log.info("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
+        log.trace("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
       }
       if (!exact) {
         if (activeReplicas >= expectedReplicas) {
@@ -2934,7 +2910,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   protected void checkShardConsistency(String collection, boolean verbose)
       throws Exception {
 
-    Set<String> theShards = getClusterState().getCollection(collection).getSlicesMap().keySet();
+    Set<String> theShards = getCollection(collection).getSlicesMap().keySet();
     String failMessage = null;
     for (String shard : theShards) {
       String shardFailMessage = checkShardConsistency(collection, shard, false, verbose);
@@ -2970,7 +2946,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     if (verbose) System.err.println("__________________________\n");
     int cnt = 0;
 
-    DocCollection coll = getClusterState().getCollection(collection);
+    DocCollection coll = getCollection(collection);
 
     Slice replicas = coll.getSlice(shard);
 


[lucene-solr] 01/02: @1466 Quick cleanup for visitors - my office needs the same.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit a04809475e83ba79a2ca7d70d78b661d54059b84
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Mar 12 15:16:56 2021 -0600

    @1466 Quick cleanup for visitors - my office needs the same.
    
    Took 1 hour 2 minutes
---
 .../solr/cloud/ChaosMonkeySafeLeaderTest.java      |  2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 78 ++++++++++++++--------
 .../org/apache/solr/cloud/MockZkStateReader.java   |  4 +-
 3 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 88ea75e..0815c69 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -171,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
       assertTrue(String.valueOf(indexThread.getFailCount()), indexThread.getFailCount() < 10);
     }
 
-    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
       if (collectionState == null) return false;
       Collection<Slice> slices = collectionState.getSlices();
       for (Slice slice : slices) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 0c7f14e..12d2671 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -940,7 +940,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         IOUtils.closeQuietly(stateWatcher);
         stateWatcher.removeWatch();
       });
-      stateWatchersMap.clear();
 
       IOUtils.closeQuietly(this.liveNodesWatcher);
       IOUtils.closeQuietly(this.collectionsChildWatcher);
@@ -948,17 +947,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         IOUtils.closeQuietly(zkClient);
       }
 
-      //      if (notifications != null) {
-      //        notifications.shutdownNow();
-      //      }
-
-      //      waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
-      //      waitLatches.clear();
-
+      stateWatchersMap.clear();
     } finally {
       assert ObjectReleaseTracker.release(this);
     }
-
   }
 
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
@@ -2084,16 +2076,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       return;
     }
 
-    AtomicBoolean reconstructState = new AtomicBoolean(false);
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
-        reconstructState.set(true);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
+        return v;
       }
       v.coreRefCount.incrementAndGet();
       return v;
@@ -2187,8 +2178,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * <code>onStateChanged</code> returns <code>true</code>
    * </p>
    */
-  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
-    if (log.isDebugEnabled()) log.debug("registerDocCollectionWatcher {}", collection);
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher docCollectionWatcher) {
+    log.debug("registerDocCollectionWatcher {}", collection);
 
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
@@ -2196,14 +2187,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
+        log.debug("creating new Collection State watcher for {}", collection);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
+        log.debug("creating watches and refreshing state watcher for {}", collection);
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
       }
-      v.stateWatchers.add(stateWatcher);
+      log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
+      v.stateWatchers.add(docCollectionWatcher);
       return v;
     });
 
@@ -2214,8 +2208,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 //      state = clusterState.getCollectionOrNull(collection);
 //    }
 
-    if (stateWatcher.onStateChanged(state) == true) {
-      removeDocCollectionWatcher(collection, stateWatcher);
+    if (docCollectionWatcher.onStateChanged(state) == true) {
+      removeDocCollectionWatcher(collection, docCollectionWatcher);
     }
 
   }
@@ -2396,16 +2390,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (v == null) return null;
       v.stateWatchers.remove(watcher);
       if (v.canBeRemoved()) {
-        log.info("no longer watch collection {}", collection);
+        log.debug("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
         clusterState.put(collection, docRef);
-        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
-        if (stateWatcher != null) {
-          IOUtils.closeQuietly(stateWatcher);
-          stateWatcher.removeWatch();
-        }
+//        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+//        if (stateWatcher != null) {
+//          IOUtils.closeQuietly(stateWatcher);
+//          stateWatcher.removeWatch();
+//        }
         reconstructState.set(true);
         return null;
       }
@@ -2442,16 +2436,42 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         return true;
       }
 
-      if (live) {
-        return true;
-      }
+//      if (live) {
+//        return true;
+//      }
 
-      watchedCollectionStates.put(coll, newState);
-      if (!collectionWatches.containsKey(coll)) {
-        lazyCollectionStates.remove(coll);
+      boolean updated = false;
+      // CAS update loop
+      while (true) {
+        if (!collectionWatches.containsKey(coll)) {
+          break;
+        }
+        DocCollection oldState = watchedCollectionStates.get(coll);
+        if (oldState == null) {
+          if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
+            if (log.isDebugEnabled()) {
+              log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
+            }
+            updated = true;
+            break;
+          }
+        } else {
+          if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+            // no change to state, but we might have been triggered by the addition of a
+            // state watcher, so run notifications
+            updated = true;
+            break;
+          }
+          if (watchedCollectionStates.replace(coll, oldState, newState)) {
+            if (log.isDebugEnabled()) {
+              log.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
+            }
+            updated = true;
+            break;
+          }
+        }
       }
 
-
       return true;
     } catch (Exception e) {
       log.error("Failing updating clusterstate", e);
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java b/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
index 01dec842..dea2a7f 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
@@ -46,9 +46,9 @@ public class MockZkStateReader extends ZkStateReader {
   }
 
   @Override
-  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher docCollectionWatcher) {
     // the doc collection will never be changed by this mock
     // so we just call onStateChanged once with the existing DocCollection object an return
-    stateWatcher.onStateChanged(clusterState.get(collection).get());
+    docCollectionWatcher.onStateChanged(clusterState.get(collection).get());
   }
 }