You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/18 20:16:36 UTC

[lucene-solr] branch reference_impl updated: @1468 Stress test addressing and cleanup, bring back publish recovery and down node.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new b8f7753  @1468 Stress test addressing and cleanup, bring back publish recovery and down node.
b8f7753 is described below

commit b8f77535bf66792d0be1624414bc23959f6b7216
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 18 15:15:52 2021 -0500

    @1468 Stress test addressing and cleanup, bring back publish recovery and down node.
---
 .../client/solrj/embedded/JettySolrRunner.java     |   5 +-
 .../java/org/apache/solr/cloud/LeaderElector.java  |   6 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   | 102 ++-
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |   3 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java    | 120 ++-
 .../java/org/apache/solr/cloud/StatePublisher.java | 103 ++-
 .../java/org/apache/solr/cloud/ZkController.java   | 186 +++--
 .../cloud/api/collections/CreateCollectionCmd.java |  13 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |   2 +
 .../OverseerCollectionMessageHandler.java          |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 294 ++++---
 .../java/org/apache/solr/core/CoreContainer.java   | 255 +++---
 .../src/java/org/apache/solr/core/SolrCore.java    |   7 +-
 .../src/java/org/apache/solr/core/SolrCores.java   |   7 +-
 .../java/org/apache/solr/handler/IndexFetcher.java |   2 +-
 .../org/apache/solr/handler/admin/ColStatus.java   |   5 +-
 .../solr/handler/admin/CollectionsHandler.java     |   3 +
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  41 +-
 .../handler/component/RealTimeGetComponent.java    |  10 +-
 .../apache/solr/rest/ManagedResourceStorage.java   |   4 +-
 .../apache/solr/schema/ZkIndexSchemaReader.java    |   3 +-
 .../java/org/apache/solr/servlet/HttpSolrCall.java |  47 +-
 .../apache/solr/servlet/SolrDispatchFilter.java    |  29 +-
 .../org/apache/solr/update/SolrCmdDistributor.java |  12 +-
 .../org/apache/solr/update/SolrIndexWriter.java    |   3 +-
 .../src/java/org/apache/solr/update/UpdateLog.java |   2 +-
 .../processor/DistributedUpdateProcessor.java      |   2 +-
 .../processor/DistributedZkUpdateProcessor.java    |  27 +-
 .../solr/util/plugin/AbstractPluginLoader.java     |  57 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  15 +-
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |  19 +-
 .../org/apache/solr/cloud/DeleteShardTest.java     |   2 +
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   |   2 +-
 .../test/org/apache/solr/cloud/RecoveryZkTest.java |  28 +-
 .../apache/solr/cloud/SolrCloudBridgeTestCase.java |   2 +-
 .../test/org/apache/solr/cloud/SyncSliceTest.java  |   5 +-
 .../apache/solr/cloud/TestCloudDeleteByQuery.java  |   2 +
 .../org/apache/solr/cloud/TestCloudRecovery.java   |  16 +-
 .../org/apache/solr/cloud/TestCloudRecovery2.java  |   3 +-
 .../org/apache/solr/cloud/TestPullReplica.java     |   7 +-
 .../org/apache/solr/cloud/TestSegmentSorting.java  |   2 +-
 .../cloud/TestWaitForStateWithJettyShutdowns.java  |   2 +-
 .../api/collections/CollectionReloadTest.java      |   1 +
 .../CreateCollectionsIndexAndRestartTest.java      |  14 +-
 .../solr/core/CachingDirectoryFactoryTest.java     |   2 +-
 .../test/org/apache/solr/core/TestConfigSets.java  |   4 +-
 .../solr/handler/component/SearchHandlerTest.java  |   2 +
 .../org/apache/solr/rest/SolrRestletTestBase.java  |   2 +
 .../rest/schema/TestUniqueKeyFieldResource.java    |   3 +-
 solr/server/etc/jetty-http.xml                     |   2 +-
 solr/server/etc/jetty-https.xml                    |   2 +-
 .../apache/solr/client/solrj/cloud/ShardTerms.java |   7 +-
 .../client/solrj/impl/BaseCloudSolrClient.java     | 175 ++--
 .../solr/client/solrj/impl/Http2SolrClient.java    |  25 +-
 .../apache/solr/common/cloud/DocCollection.java    |  17 +-
 .../java/org/apache/solr/common/cloud/Replica.java |   1 -
 .../java/org/apache/solr/common/cloud/Slice.java   |   5 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |   5 +
 .../apache/solr/common/cloud/ZkStateReader.java    | 913 ++++++++++++---------
 .../solr/common/util/ObjectReleaseTracker.java     |   2 +-
 .../solr/common/util/SolrQueuedThreadPool.java     |   2 +-
 .../src/java/org/apache/solr/SolrTestCase.java     |   2 +-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |   5 +-
 63 files changed, 1532 insertions(+), 1116 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 0784133..4eede2c 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -20,6 +20,7 @@ import org.apache.lucene.util.Constants;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -720,7 +721,7 @@ public class JettySolrRunner implements Closeable {
           ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
 
           try {
-            if (!reader.isClosed() && reader.getZkClient().isConnected()) {
+            if (reader != null && !reader.isClosed() && reader.getZkClient().isConnected()) {
               reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
             }
           } catch (InterruptedException e) {
@@ -966,7 +967,7 @@ public class JettySolrRunner implements Closeable {
     public void close() throws IOException {
       try {
         zkClient.removeWatches(ZkStateReader.COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 71873ce..942c6d7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -282,7 +282,7 @@ public class LeaderElector implements Closeable {
       if (success) {
         state = LEADER;
       } else {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed becoming leader");
+        log.warn("Failed becoming leader {}", context.leaderProps);
       }
     } finally {
       if (!success) {
@@ -451,7 +451,7 @@ public class LeaderElector implements Closeable {
   }
 
   private boolean shouldRejectJoins() {
-    return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || isClosed;
+    return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || zkClient.isClosed();
   }
 
   @Override
@@ -580,7 +580,7 @@ public class LeaderElector implements Closeable {
       this.closed = true;
       try {
         zkClient.removeWatches(watchedNode, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 5ae5e4b..384d486 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -69,11 +69,11 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -789,7 +789,7 @@ public class Overseer implements SolrCloseable {
     private void closeWatcher() {
       try {
         zkController.getZkClient().removeWatches(path, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
@@ -808,7 +808,7 @@ public class Overseer implements SolrCloseable {
 
       zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
       startItems = super.getItems();
-      log.info("Overseer found entries on start {}", startItems);
+      log.info("Overseer found entries on start {} {}", startItems, path);
       if (startItems.size() > 0) {
         processQueueItems(startItems, true);
       }
@@ -816,11 +816,12 @@ public class Overseer implements SolrCloseable {
 
     @Override
     protected void processQueueItems(List<String> items, boolean onStart) {
-      if (closed) return;
+      //if (closed) return;
       List<String> fullPaths = new ArrayList<>(items.size());
       CountDownLatch delCountDownLatch = null;
       ourLock.lock();
       String forceWrite = null;
+      boolean wroteUpdates = false;
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -829,16 +830,6 @@ public class Overseer implements SolrCloseable {
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
 
-        if (fullPaths.size() > 0) {
-          if (!zkController.getZkClient().isClosed()) {
-            try {
-              delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
-            } catch (Exception e) {
-              log.warn("Failed deleting processed items", e);
-            }
-          }
-        }
-
         final List<StateEntry> shardStateCollections = new ArrayList<>();
 
         for (byte[] item : data.values()) {
@@ -846,13 +837,14 @@ public class Overseer implements SolrCloseable {
           try {
             StateEntry entry = new StateEntry();
             entry.message = message;
+            log.debug("add state update {}", message);
             shardStateCollections.add(entry);
 
           } catch (Exception e) {
             log.error("Overseer state update queue processing failed", e);
           }
         }
-        Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates = new HashMap<>();
+        Map<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> collStateUpdates = new ConcurrentHashMap<>();
 
         for (Overseer.StateEntry sentry : shardStateCollections) {
           try {
@@ -875,60 +867,69 @@ public class Overseer implements SolrCloseable {
                     }
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
-                      List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                      ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
                       if (updates == null) {
-                        updates = new ArrayList<>();
+                        updates = new ConcurrentHashMap<>( );
                         collStateUpdates.put(collId, updates);
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
                         if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
-                          if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
+                          if (log.isDebugEnabled()) log.debug("set down node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
                           update.state = Replica.State.getShortState(Replica.State.DOWN);
-                          updates.add(update);
+                          updates.put(update.id, update);
                         }
                       }
                     });
                   } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
                     Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
                       String collId = Long.toString(docColl.getId());
-                      List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                      ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
                       if (updates == null) {
-                        updates = new ArrayList<>();
+                        updates = new ConcurrentHashMap<>();
                         collStateUpdates.put(collId, updates);
                       }
                       List<Replica> replicas = docColl.getReplicas();
                       for (Replica replica : replicas) {
                         if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
-                          if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
+                          if (log.isDebugEnabled()) log.debug("set recovery node operation {} for replica {}", op, replica);
                           ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                           update.id = replica.getId();
                           update.state = Replica.State.getShortState(Replica.State.RECOVERING);
-                          updates.add(update);
+                          updates.put(update.id, update);
                         }
                       }
                     });
-                  } else {
-                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
-                    String id = stateUpdateEntry.getKey();
+                  }
 
-                    String stateString = (String) stateUpdateEntry.getValue();
-                    if (log.isDebugEnabled()) {
-                      log.debug("stateString={}", stateString);
-                    }
-                    String collId = id.substring(0, id.indexOf('-'));
-                    List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
-                    if (updates == null) {
-                      updates = new ArrayList<>();
-                      collStateUpdates.put(collId, updates);
+                  for (Map.Entry<String,Object> stateUpdateEntry2 : stateUpdateMessage.getProperties().entrySet()) {
+                    //  if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
+                    if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey())) || OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey()))) {
+                      continue;
                     }
+                    String id = stateUpdateEntry2.getKey();
+
+                    String stateString = (String) stateUpdateEntry2.getValue();
+
+                    log.trace("stateString={}", stateString);
+
+                    try {
+                      String collId = id.substring(0, id.indexOf('-'));
+                      ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                      if (updates == null) {
+                        updates = new ConcurrentHashMap<>();
+                        collStateUpdates.put(collId, updates);
+                      }
 
-                    ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
-                    update.id = id;
-                    update.state = stateString;
-                    updates.add(update);
+                      ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+                      update.id = id;
+                      update.state = stateString;
+                      updates.put(id, update);
+                    } catch (Exception e) {
+                      log.error("error processing state update {} {}", id, stateString);
+                    }
                   }
                 }
 
@@ -945,9 +946,9 @@ public class Overseer implements SolrCloseable {
                 Long collIdLong = zkStateWriter.getCS().get(collection).getId();
                 if (collIdLong != null) {
                   String collId = Long.toString(collIdLong);
-                  List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+                  ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
                   if (updates == null) {
-                    updates = new ArrayList<>();
+                    updates = new ConcurrentHashMap<>();
                     collStateUpdates.put(collId, updates);
                   }
 
@@ -958,7 +959,7 @@ public class Overseer implements SolrCloseable {
                       ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
                       update.sliceState = (String) stateUpdateEntry.getValue();
                       update.sliceName = stateUpdateEntry.getKey();
-                      updates.add(update);
+                      updates.put(update.sliceName, update);
                     }
                   }
                 }
@@ -989,11 +990,24 @@ public class Overseer implements SolrCloseable {
         if (collections.size() == 0 && forceWrite != null) {
           overseer.writePendingUpdates(forceWrite);
         }
-
-      } finally {
+        wroteUpdates = true;
+      } catch (Exception e) {
+        log.error("Exception handling Overseer state updates",e);
+      }  finally {
         try {
+          if (fullPaths.size() > 0 && wroteUpdates) {
+            if (!zkController.getZkClient().isClosed()) {
+              try {
+                delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
+              } catch (Exception e) {
+                log.warn("Failed deleting processed items", e);
+              }
+            }
+          }
+
           if (delCountDownLatch != null) {
             try {
+
               boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
 
               if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 618430c..774cd74 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud;
 
 import com.codahale.metrics.Timer;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -214,7 +215,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
     public void close() {
       try {
         zkClient.removeWatches(path, this, WatcherType.Any, true);
-      }  catch (KeeperException.NoWatcherException e) {
+      }  catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 88e138e..013e285 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -77,6 +77,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class RecoveryStrategy implements Runnable, Closeable {
 
+  private final String collection;
+  private final String shard;
   private volatile CountDownLatch latch;
   private volatile ReplicationHandler replicationHandler;
   private volatile Http2SolrClient recoveryOnlyClient;
@@ -107,7 +109,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   private volatile int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer
       .getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 0);
-  private volatile int maxRetries = 500;
+  private volatile int maxRetries = Integer.getInteger("solr.recovery.maxretries", 500);
   private volatile int startingRecoveryDelayMilliSeconds = Integer
       .getInteger("solr.cloud.starting-recovery-delay-milli-seconds", 0);
 
@@ -134,6 +136,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
     // ObjectReleaseTracker.track(this);
     this.cc = cc;
     this.coreName = cd.getName();
+    this.collection = cd.getCloudDescriptor().getCollectionName();
+    this.shard  = cd.getCloudDescriptor().getShardId();
 
     this.recoveryListener = recoveryListener;
     zkController = cc.getZkController();
@@ -353,13 +357,21 @@ public class RecoveryStrategy implements Runnable, Closeable {
           // expected
         }
 
-        Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 3000, false);
+        LeaderElector leaderElector = zkController.getLeaderElector(coreName);
 
-        if (leader != null && leader.getName().equals(coreName)) {
+        if (leaderElector != null && leaderElector.isLeader()) {
           log.info("We are the leader, STOP recovery");
           close = true;
           return;
         }
+
+        Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
+
+        if (leader != null && leader.getName().equals(coreName)) {
+          log.info("We are the leader in cluster state, REPEAT recovery");
+          Thread.sleep(50);
+          continue;
+        }
         if (core.isClosing() || core.getCoreContainer().isShutDown()) {
           log.info("We are closing, STOP recovery");
           close = true;
@@ -408,8 +420,21 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         try {
 
-          leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, true);
+          LeaderElector leaderElector = zkController.getLeaderElector(coreName);
+
+          if (leaderElector != null && leaderElector.isLeader()) {
+            log.info("We are the leader, STOP recovery");
+            close = true;
+            return false;
+          }
+
+          leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
 
+          if (leader != null && leader.getName().equals(coreName)) {
+            log.info("We are the leader in cluster state, REPEAT recovery");
+            Thread.sleep(50);
+            continue;
+          }
 
           if (leader != null && leader.getName().equals(coreName)) {
             log.info("We are the leader, STOP recovery");
@@ -603,15 +628,22 @@ public class RecoveryStrategy implements Runnable, Closeable {
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
 
+        LeaderElector leaderElector = zkController.getLeaderElector(coreName);
 
-        leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, true);
-
-        if (leader != null && leader.getName().equals(coreName)) {
+        if (leaderElector != null && leaderElector.isLeader()) {
           log.info("We are the leader, STOP recovery");
           close = true;
           return false;
         }
 
+        leader = zkController.getZkStateReader().getLeaderRetry(core.getCoreDescriptor().getCollectionName(), core.getCoreDescriptor().getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
+
+        if (leader != null && leader.getName().equals(coreName)) {
+          log.info("We are the leader in cluster state, REPEAT recovery");
+          Thread.sleep(50);
+          continue;
+        }
+
         log.debug("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
@@ -676,13 +708,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
           didReplication = true;
           try {
 
-            //        try {
-            //          if (prevSendPreRecoveryHttpUriRequest != null) {
-            //            prevSendPreRecoveryHttpUriRequest.cancel();
-            //          }
-            //        } catch (NullPointerException e) {
-            //          // okay
-            //        }
+            try {
+              if (prevSendPreRecoveryHttpUriRequest != null) {
+                prevSendPreRecoveryHttpUriRequest.cancel();
+              }
+            } catch (NullPointerException e) {
+              // okay
+            }
+            log.debug("Begin buffering updates. core=[{}]", coreName);
+            // recalling buffer updates will drop the old buffer tlog
+            ulog.bufferUpdates();
 
             sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().
                 getCollection(core.getCoreDescriptor().getCollectionName()).getSlice(cloudDesc.getShardId()), core.getCoreDescriptor());
@@ -906,8 +941,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     return close || cc.isShutDown();
   }
 
-  final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice, CoreDescriptor coreDescriptor)
-      throws SolrServerException, IOException {
+  final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice, CoreDescriptor coreDescriptor) {
 
     if (coreDescriptor.getCollectionName() == null) {
       throw new IllegalStateException("Collection name cannot be null");
@@ -923,7 +957,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     log.info("Sending prep recovery command to {} for leader={} params={}", leaderBaseUrl, leaderCoreName, prepCmd.getParams());
 
     int conflictWaitMs = zkController.getLeaderConflictResolveWait();
-    int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "1000"));
+    int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
 
     try (Http2SolrClient client = new Http2SolrClient.Builder(leaderBaseUrl).withHttpClient(cc.getUpdateShardHandler().
         getRecoveryOnlyClient()).idleTimeout(readTimeout).markInternalRequest().build()) {
@@ -931,20 +965,19 @@ public class RecoveryStrategy implements Runnable, Closeable {
       prepCmd.setBasePath(leaderBaseUrl);
 
       latch = new CountDownLatch(1);
-      Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener(latch));
+      Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener(latch, leaderCoreName));
       try {
         prevSendPreRecoveryHttpUriRequest = result;
         try {
-          boolean success = latch.await(15, TimeUnit.SECONDS);
+          boolean success = latch.await(readTimeout, TimeUnit.MILLISECONDS);
           if (!success) {
-            result.cancel();
-            throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Timeout waiting for prep recovery cmd on leader");
+            //result.cancel();
+            log.warn("Timeout waiting for prep recovery cmd on leader {}", leaderCoreName);
           }
         } catch (InterruptedException e) {
           close = true;
           ParWork.propagateInterrupt(e);
         } finally {
-          prevSendPreRecoveryHttpUriRequest = null;
           latch = null;
         }
       } finally {
@@ -953,12 +986,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
   }
 
-  private static class NamedListAsyncListener implements AsyncListener<NamedList<Object>> {
+  private class NamedListAsyncListener implements AsyncListener<NamedList<Object>> {
 
     private final CountDownLatch latch;
+    private final String leaderCoreName;
 
-    public NamedListAsyncListener(CountDownLatch latch) {
+    public NamedListAsyncListener(CountDownLatch latch, String leaderCoreName) {
       this.latch = latch;
+      this.leaderCoreName = leaderCoreName;
     }
 
     @Override
@@ -968,15 +1003,46 @@ public class RecoveryStrategy implements Runnable, Closeable {
       } catch (NullPointerException e) {
 
       }
+      prevSendPreRecoveryHttpUriRequest = null;
     }
 
     @Override
     public void onFailure(Throwable throwable, int code) {
-      try {
-        latch.countDown();
-      } catch (NullPointerException e) {
+      log.info("failed sending prep recovery cmd to leader");
+
+      if (throwable.getMessage().contains("Not the valid leader")) {
+        try {
+          try {
+            Thread.sleep(250);
+            cc.getZkController().getZkStateReader().waitForState(RecoveryStrategy.this.collection, 3, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+              if (collectionState == null) {
+                return false;
+              }
+              Slice slice = collectionState.getSlice(shard);
+              if (slice == null) {
+                return false;
+              }
+              if (slice.getLeader() == null) {
+                return false;
+              }
+              if (slice.getLeader().getName() == leaderCoreName) {
+                return false;
+              }
+              return true;
+            });
+          } catch (Exception e) {
 
+          }
+        } finally {
+          try {
+            latch.countDown();
+          } catch (NullPointerException e) {
+
+          }
+          prevSendPreRecoveryHttpUriRequest = null;
+        }
       }
+
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 8491334..8fe7abb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -17,7 +17,6 @@
 package org.apache.solr.cloud;
 
 import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
@@ -27,8 +26,8 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class StatePublisher implements Closeable {
   private static final Logger log = LoggerFactory
@@ -62,10 +60,10 @@ public class StatePublisher implements Closeable {
 
   public static class NoOpMessage extends ZkNodeProps {
   }
-
+  static final String PREFIX = "qn-";
   public static final NoOpMessage TERMINATE_OP = new NoOpMessage();
 
-  private final BlockingArrayQueue<ZkNodeProps> workQueue = new BlockingArrayQueue(64, 16);
+  private final ArrayBlockingQueue<ZkNodeProps> workQueue = new ArrayBlockingQueue<>(1024, true);
   private final ZkDistributedQueue overseerJobQueue;
   private volatile Worker worker;
   private volatile Future<?> workerFuture;
@@ -80,50 +78,60 @@ public class StatePublisher implements Closeable {
     @Override
     public void run() {
 
-      while (!terminated && !zkStateReader.getZkClient().isClosed()) {
-        if (!zkStateReader.getZkClient().isConnected()) {
-          try {
-            zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
-          } catch (TimeoutException e) {
-            continue;
-          } catch (InterruptedException e) {
-            log.error("publisher interrupted", e);
-          }
-          continue;
-        }
+      while (!terminated) {
+//        if (!zkStateReader.getZkClient().isConnected()) {
+//          try {
+//            zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
+//          } catch (TimeoutException e) {
+//            continue;
+//          } catch (InterruptedException e) {
+//            log.error("publisher interrupted", e);
+//          }
+//          continue;
+//        }
 
         ZkNodeProps message = null;
         ZkNodeProps bulkMessage = new ZkNodeProps();
         bulkMessage.getProperties().put(OPERATION, "state");
+        int pollTime = 250;
         try {
           try {
-            message = workQueue.poll(1000, TimeUnit.MILLISECONDS);
-          } catch (InterruptedException e) {
-
+            log.debug("State publisher will poll for 5 seconds");
+            message = workQueue.poll(5000, TimeUnit.MILLISECONDS);
+          } catch (Exception e) {
+            log.warn("state publisher hit exception polling", e);
           }
           if (message != null) {
             log.debug("Got state message " + message);
+
             if (message == TERMINATE_OP) {
               log.debug("State publish is terminated");
               terminated = true;
-              return;
             } else {
-              bulkMessage(message, bulkMessage);
+              if (bulkMessage(message, bulkMessage)) {
+                pollTime = 20;
+              } else {
+                pollTime = 150;
+              }
             }
 
-            while (!terminated) {
+            while (true) {
               try {
-                message = workQueue.poll(100, TimeUnit.MILLISECONDS);
-              } catch (InterruptedException e) {
-                log.warn("state publisher interrupted", e);
-                return;
+                log.debug("State publisher will poll for {} ms", pollTime);
+                message = workQueue.poll(pollTime, TimeUnit.MILLISECONDS);
+              } catch (Exception e) {
+                log.warn("state publisher hit exception polling", e);
               }
               if (message != null) {
                 if (log.isDebugEnabled()) log.debug("Got state message " + message);
                 if (message == TERMINATE_OP) {
                   terminated = true;
                 } else {
-                  bulkMessage(message, bulkMessage);
+                  if (bulkMessage(message, bulkMessage)) {
+                    pollTime = 10;
+                  } else {
+                    pollTime = 25;
+                  }
                 }
               } else {
                 break;
@@ -133,20 +141,21 @@ public class StatePublisher implements Closeable {
 
           if (bulkMessage.getProperties().size() > 1) {
             processMessage(bulkMessage);
+          } else {
+            log.debug("No messages to publish, loop");
           }
 
-        } catch (AlreadyClosedException e) {
-          log.info("StatePublisher run loop hit AlreadyClosedException, exiting ...");
-          return;
+          if (terminated) {
+            log.info("State publisher has terminated");
+            break;
+          }
         } catch (Exception e) {
-          log.error("Exception in StatePublisher run loop, exiting", e);
-          return;
+          log.error("Exception in StatePublisher run loop", e);
         }
       }
     }
 
-    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
-      if (log.isDebugEnabled()) log.debug("Bulk state zkNodeProps={} bulkMessage={}", zkNodeProps, bulkMessage);
+    private boolean bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
       if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
         String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
         //clearStatesForNode(bulkMessage, nodeName);
@@ -168,7 +177,11 @@ public class StatePublisher implements Closeable {
         String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
         if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
         bulkMessage.getProperties().put(id, line);
+        if (state.equals(Replica.State.RECOVERING.toString())) {
+          return true;
+        }
       }
+      return false;
     }
 
     private void clearStatesForNode(ZkNodeProps bulkMessage, String nodeName) {
@@ -195,9 +208,17 @@ public class StatePublisher implements Closeable {
     }
 
     private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
-      log.debug("Send state updates to Overseer {}", message);
+      log.info("Send state updates to Overseer {}", message);
       byte[] updates = Utils.toJSON(message);
-      overseerJobQueue.offer(updates);
+
+      zkStateReader.getZkClient().create("/overseer/queue" + "/" + PREFIX, updates, CreateMode.PERSISTENT_SEQUENTIAL, (rc, path, ctx, name, stat) -> {
+        if (rc != 0) {
+          log.error("got zk error deleting path {} {}", path, rc);
+          KeeperException e = KeeperException.create(KeeperException.Code.get(rc), path);
+          log.error("Exception publish state messages path=" + path, e);
+          workQueue.offer(message);
+        }
+      });
     }
   }
 
@@ -240,10 +261,11 @@ public class StatePublisher implements Closeable {
 
             CacheEntry lastState = stateCache.get(id);
             //&& (System.currentTimeMillis() - lastState.time < 1000) &&
-            if (!state.equals("leader") && collection != null && lastState != null && replica != null && !state.equals(lastState.state) && replica.getState().toString().equals(state)) {
-              log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
-              return;
-            }
+            // TODO: needs work
+//            if (state.equals(lastState.state)) {
+//              log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+//              return;
+//            }
           }
 
           if (id == null) {
@@ -294,6 +316,7 @@ public class StatePublisher implements Closeable {
           }
 
         } else {
+          log.error("illegal state message {}", stateMessage.toString());
           throw new IllegalArgumentException(stateMessage.toString());
         }
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 680e57c..c49425c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -21,7 +21,9 @@ import org.apache.solr.client.solrj.cloud.DistributedLock;
 import org.apache.solr.client.solrj.cloud.LockListener;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
@@ -50,6 +52,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.URLUtil;
@@ -59,7 +62,6 @@ import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrCoreInitializationException;
 import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.packagemanager.PackageUtils;
@@ -324,6 +326,8 @@ public class ZkController implements Closeable, Runnable {
         if (zkController.cc.getAllCoreNames().contains(descriptor.getName())) {
           try {
             zkController.register(descriptor.getName(), descriptor, afterExpiration);
+          } catch (AlreadyClosedException e) {
+            log.warn("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
           } catch (Exception e) {
             log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
           }
@@ -1080,7 +1084,6 @@ public class ZkController implements Closeable, Runnable {
 
       try {
         this.overseerRunningMap = Overseer.getRunningMap(zkClient);
-
         this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
         this.overseerFailureMap = Overseer.getFailureMap(zkClient);
         this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
@@ -1097,18 +1100,16 @@ public class ZkController implements Closeable, Runnable {
       this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(), getNodeName(), zkStateReader);
       overseerElector = new LeaderElector(this);
       //try (ParWork worker = new ParWork(this, false, true)) {
-        // start the overseer first as following code may need it's processing
-       // worker.collect("startOverseer", () -> {
-          ElectionContext context = getOverseerContext();
-          if (log.isDebugEnabled()) log.debug("Overseer setting up context {}", context.leaderProps.getNodeName());
-          overseerElector.setup(context);
+      // start the overseer first as following code may need it's processing
+      // worker.collect("startOverseer", () -> {
+      ElectionContext context = getOverseerContext();
+      if (log.isDebugEnabled()) log.debug("Overseer setting up context {}", context.leaderProps.getNodeName());
+      overseerElector.setup(context);
 
-          log.info("Overseer joining election {}", context.leaderProps.getNodeName());
-          overseerElector.joinElection(false);
+      log.info("Overseer joining election {}", context.leaderProps.getNodeName());
+      overseerElector.joinElection(false);
 
-      zkStateReader.forciblyRefreshAllClusterStateSlow();
       publishNodeAs(getNodeName(), OverseerAction.RECOVERYNODE);
-
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -1205,16 +1206,18 @@ public class ZkController implements Closeable, Runnable {
     }
   }
 
-  public void removeEphemeralLiveNode() throws KeeperException {
-    log.info("Removing our ephemeral live node");
-    String nodeName = getNodeName();
-    String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
-    try {
-      zkClient.delete(nodePath, -1, true, false);
-    } catch (NoNodeException | SessionExpiredException e) {
-      // okay
-    } catch (Exception e) {
-      log.warn("Could not remove ephemeral live node {}", nodePath, e);
+  public void removeEphemeralLiveNode() {
+    if (zkClient.isAlive()) {
+      log.info("Removing our ephemeral live node");
+      String nodeName = getNodeName();
+      String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+      try {
+        zkClient.delete(nodePath, -1, true, false);
+      } catch (NoNodeException | SessionExpiredException e) {
+        // okay
+      } catch (Exception e) {
+        log.warn("Could not remove ephemeral live node {}", nodePath, e);
+      }
     }
   }
 
@@ -1255,6 +1258,7 @@ public class ZkController implements Closeable, Runnable {
     }
     MDCLoggingContext.setCoreName(desc.getName());
     ZkShardTerms shardTerms = null;
+   // LeaderElector leaderElector = null;
     LeaderElector leaderElector = null;
     try {
       final String baseUrl = getBaseUrl();
@@ -1262,17 +1266,6 @@ public class ZkController implements Closeable, Runnable {
       final String collection = cloudDesc.getCollectionName();
       final String shardId = cloudDesc.getShardId();
 
-      log.debug("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
-
-      DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
-      if (docCollection != null) {
-        Replica replica = docCollection.getReplica(coreName);
-        if (replica != null && !baseUrl.equals(replica.getBaseUrl())) {
-          log.error("IllegalStateException wrong base url for this node in replica entry replica={}", replica);
-          //throw new IllegalArgumentException("wrong base url for this node in replica entry baseUrl=" + baseUrl + " replica=" + replica);
-        }
-      }
-
       log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
 
       log.debug("Register terms for replica {}", coreName);
@@ -1280,22 +1273,22 @@ public class ZkController implements Closeable, Runnable {
       registerShardTerms(collection, cloudDesc.getShardId(), coreName);
 
       log.info("Create leader elector for replica {}", coreName);
-      leaderElector = leaderElectors.get(coreName);
-      if (leaderElector == null) {
-        leaderElector = new LeaderElector(this);
-        LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
-
-        if (oldElector != null) {
-          IOUtils.closeQuietly(leaderElector);
-        }
-
-        if (cc.isShutDown()) {
-          IOUtils.closeQuietly(leaderElector);
-          IOUtils.closeQuietly(oldElector);
-          IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
-          throw new AlreadyClosedException();
-        }
-      }
+//      leaderElector = leaderElectors.get(coreName);
+//      if (leaderElector == null) {
+//        leaderElector = new LeaderElector(this);
+//        LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
+//
+//        if (oldElector != null) {
+//          IOUtils.closeQuietly(leaderElector);
+//        }
+//
+//        if (cc.isShutDown()) {
+//          IOUtils.closeQuietly(leaderElector);
+//          IOUtils.closeQuietly(oldElector);
+//          IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
+//          throw new AlreadyClosedException();
+//        }
+//      }
 
       // If we're a preferred leader, insert ourselves at the head of the queue
       boolean joinAtHead = false; //replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
@@ -1309,19 +1302,56 @@ public class ZkController implements Closeable, Runnable {
       log.info("Wait to see leader for {}, {}", collection, shardId);
       String leaderName = null;
 
-      for (int i = 0; i < 60; i++) {
+      for (int i = 0; i < 20; i++) {
         if (isClosed() || isDcCalled() || cc.isShutDown()) {
           throw new AlreadyClosedException();
         }
+        leaderElector = leaderElectors.get(coreName);
 
-        if (leaderElector.isLeader()) {
+        if (leaderElector != null && leaderElector.isLeader()) {
           leaderName = coreName;
           break;
         }
         try {
-          Replica leader = zkStateReader.getLeaderRetry(collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 10000), true);
+          Replica leader = zkStateReader.getLeaderRetry(collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 5000));
           leaderName = leader.getName();
 
+          boolean isLeader = leaderName.equals(coreName);
+
+          if (isLeader) {
+            if (leaderElector != null && leaderElector.isLeader()) {
+              break;
+            } else {
+              Thread.sleep(100);
+            }
+          } else {
+            boolean stop = true;
+            CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+            prepCmd.setCoreName(leader.getName());
+            prepCmd.setLeaderName(leader.getName());
+            prepCmd.setCollection(collection);
+            prepCmd.setShardId(shardId);
+
+            int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+
+            try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient()).markInternalRequest().build()) {
+
+              prepCmd.setBasePath(leader.getBaseUrl());
+
+              try {
+                NamedList<Object> result = client.request(prepCmd);
+              } catch (Exception e) {
+                log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+                stop = false;
+              }
+            }
+            if (stop) {
+              break;
+            } else {
+              Thread.sleep(100);
+            }
+          }
+
         } catch (TimeoutException timeoutException) {
           if (isClosed() || isDcCalled() || cc.isShutDown()) {
             throw new AlreadyClosedException();
@@ -1332,7 +1362,7 @@ public class ZkController implements Closeable, Runnable {
       }
 
       if (leaderName == null) {
-        log.error("No leader found while trying to register " + coreName + " with zookeeper");
+        log.error("No leader found while trying to register " + coreName + " with zookeeper collection={}", zkStateReader.getCollectionOrNull(collection));
         throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
       }
 
@@ -1502,7 +1532,7 @@ public class ZkController implements Closeable, Runnable {
   }
 
 
-  private void joinElection(CoreDescriptor cd, boolean joinAtHead) {
+  private LeaderElector joinElection(CoreDescriptor cd, boolean joinAtHead) {
     log.info("joinElection {}", cd.getName());
     // look for old context - if we find it, cancel it
     String collection = cd.getCloudDescriptor().getCollectionName();
@@ -1530,7 +1560,7 @@ public class ZkController implements Closeable, Runnable {
     LeaderElector leaderElector;
 
     if (isDcCalled() || isClosed) {
-      return;
+      return null;
     }
     leaderElector = leaderElectors.get(replica.getName());
     if (leaderElector == null) {
@@ -1548,6 +1578,8 @@ public class ZkController implements Closeable, Runnable {
     leaderElector.setup(context);
     log.info("Joining election ...");
     leaderElector.joinElection( false, joinAtHead);
+
+    return leaderElector;
   }
 
 
@@ -1598,24 +1630,24 @@ public class ZkController implements Closeable, Runnable {
       //  props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
       props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
-      try {
-        if (core.getDirectoryFactory().isSharedStorage()) {
-          // MRM TODO: currently doesn't publish anywhere
-          if (core.getDirectoryFactory().isSharedStorage()) {
-            props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
-            props.put("dataDir", core.getDataDir());
-            UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-            if (ulog != null) {
-              props.put("ulogDir", ulog.getLogDir());
-            }
-          }
-        }
-      } catch (SolrCoreInitializationException ex) {
-        // The core had failed to initialize (in a previous request, not this one), hence nothing to do here.
-        if (log.isInfoEnabled()) {
-          log.info("The core '{}' had failed to initialize before.", cd.getName());
-        }
-      }
+//      try {
+//        if (core.getDirectoryFactory().isSharedStorage()) {
+//          // MRM TODO: currently doesn't publish anywhere
+//          if (core.getDirectoryFactory().isSharedStorage()) {
+//            props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
+//            props.put("dataDir", core.getDataDir());
+//            UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+//            if (ulog != null) {
+//              props.put("ulogDir", ulog.getLogDir());
+//            }
+//          }
+//        }
+//      } catch (SolrCoreInitializationException ex) {
+//        // The core had failed to initialize (in a previous request, not this one), hence nothing to do here.
+//        if (log.isInfoEnabled()) {
+//          log.info("The core '{}' had failed to initialize before.", cd.getName());
+//        }
+//      }
 
       // pull replicas are excluded because their terms are not considered
       if ((state == Replica.State.RECOVERING || state == Replica.State.BUFFERING) && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
@@ -1855,6 +1887,14 @@ public class ZkController implements Closeable, Runnable {
     }
   }
 
+  public void clearStatePublisher() {
+    this.statePublisher.clearStatCache();
+  }
+
+  public void clearCachedState(String coreName) {
+    this.statePublisher.clearStatCache(coreName);
+  }
+
   public int getClientTimeout() {
     return clientTimeout;
   }
@@ -2078,7 +2118,7 @@ public class ZkController implements Closeable, Runnable {
         if (log.isDebugEnabled()) log.debug("No more listeners for config directory [{}]", confDir);
         try {
           zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true);
-        } catch (KeeperException.NoWatcherException e) {
+        } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
         } catch (Exception e) {
           log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index be522c6..9438f80 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -398,29 +398,30 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           if (waitForFinalState && (createNodeSet == null || !createNodeSet.equals(ZkStateReader.CREATE_NODE_SET_EMPTY))) {
             try {
               zkStateReader.waitForState(collectionName, CREATE_COLLECTION_TIMEOUT, TimeUnit.SECONDS, (l, c) -> {
+                log.debug("notified cmd {}", c);
                 if (c == null) {
                   return false;
                 }
                 for (String name : coresToCreate.keySet()) {
-                  if (log.isTraceEnabled()) log.trace("look for core {}", name);
+                  log.debug("look for core {} {} {} {}", name, c.getReplica(name), c.getReplica(name).getState(),  c.getReplica(name).getState() != Replica.State.ACTIVE);
                   if (c.getReplica(name) == null || c.getReplica(name).getState() != Replica.State.ACTIVE) {
-                    if (log.isTraceEnabled()) log.trace("not the right replica or state {}", c.getReplica(name));
+                    log.debug("not the right replica or state {}", c.getReplica(name));
                     return false;
                   }
                 }
                 Collection<Slice> slices = c.getSlices();
                 if (slices.size() < shardNames.size()) {
-                  if (log.isTraceEnabled()) log.trace("wrong number slices {} vs {}", slices.size(), shardNames.size());
+                  log.debug("wrong number slices {} vs {}", slices.size(), shardNames.size());
                   return false;
                 }
                 for (Slice slice : slices) {
-                  if (log.isTraceEnabled()) log.trace("slice {} leader={}", slice, slice.getLeader());
+                  log.debug("slice {} leader={}", slice, slice.getLeader());
                   if (slice.getLeader() == null || (slice.getLeader() != null && slice.getLeader().getState() != Replica.State.ACTIVE)) {
-                    if (log.isTraceEnabled()) log.trace("no leader found for slice {}", slice.getName());
+                    log.debug("no leader found for slice {}", slice.getName());
                     return false;
                   }
                 }
-                if (log.isTraceEnabled()) log.trace("return true, everything active");
+                log.debug("return true, everything active");
                 return true;
               });
             } catch (InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 9cb3980..14985f3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -116,6 +116,7 @@ public class DeleteReplicaCmd implements Cmd {
         ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
         ShardHandler finalShardHandler = shardHandler;
         String finalCollectionName = collectionName;
+        String finalCollectionName2 = collectionName;
         response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
           @Override
           public CollectionCmdResponse.Response call() {
@@ -134,6 +135,7 @@ public class DeleteReplicaCmd implements Cmd {
                 log.error("Exception waiting for delete replica response");
               }
             }
+            ocmh.overseer.getZkStateWriter().writePendingUpdates(finalCollectionName2);
             Set<String> replicas = (Set<String>) resp.results.get("replicas_deleted");
             for (String replica : replicas) {
               try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index c6f98da..5673a6d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -1245,7 +1245,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       this.closed = true;
       try {
         zkClient.removeWatches(watchPath, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index cd80b5d..5c98e6b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.overseer;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -36,12 +37,10 @@ import org.apache.solr.cloud.Stats;
 import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
@@ -64,7 +63,7 @@ public class ZkStateWriter {
 
   protected volatile Stats stats;
 
-  private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
+  private final Map<String, ConcurrentHashMap> stateUpdates = new ConcurrentHashMap<>();
 
   Map<Long,String> idToCollection = new ConcurrentHashMap<>(128, 0.75f, 16);
 
@@ -90,7 +89,7 @@ public class ZkStateWriter {
 
   }
 
-  public void enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
+  public void enqueueUpdate(DocCollection docCollection, Map<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
 
     try {
 
@@ -175,8 +174,8 @@ public class ZkStateWriter {
           collState.collLock.unlock();
         }
       } else {
-        if (log.isDebugEnabled()) log.debug("enqueue state change states={}", collStateUpdates);
-        for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+        log.trace("enqueue state change states={}", collStateUpdates);
+        for (Map.Entry<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> entry : collStateUpdates.entrySet()) {
 
           ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
             if (reentrantLock == null) {
@@ -192,12 +191,14 @@ public class ZkStateWriter {
             String collection = idToCollection.get(Long.parseLong(collectionId));
             if (collection == null) {
               log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
-              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
+
+              continue;
+              //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
             }
 
-            ZkNodeProps updates = stateUpdates.get(collection);
+            ConcurrentHashMap updates = stateUpdates.get(collection);
             if (updates == null) {
-              updates = new ZkNodeProps();
+              updates = new ConcurrentHashMap();
               stateUpdates.put(collection, updates);
             }
 
@@ -205,46 +206,91 @@ public class ZkStateWriter {
             String csVersion;
             if (docColl != null) {
               csVersion = Integer.toString(docColl.getZNodeVersion());
-              for (StateUpdate state : entry.getValue()) {
+              for (StateUpdate state : entry.getValue().values()) {
                 if (state.sliceState != null) {
                   Slice slice = docColl.getSlice(state.sliceName);
                   if (slice != null) {
                     slice.setState(Slice.State.getState(state.sliceState));
+                    slice.getProperties().put("state", state.sliceState);
                   }
                   dirtyStructure.add(collection);
                   continue;
                 }
 
                 Replica replica = docColl.getReplicaById(state.id);
-                log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+                log.trace("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
                 if (replica != null) {
-                  String setState = Replica.State.shortStateToState(state.state).toString();
-                  log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
-                  if (setState.equals("leader")) {
-                    if (log.isDebugEnabled()) {
-                      log.debug("set leader {}", replica);
-                    }
+
+                  log.trace("zkwriter publish state={} replica={}", state.state, replica.getName());
+                  if (state.state.equals("l")) {
+
+                    log.trace("set leader {}", replica);
+
                     Slice slice = docColl.getSlice(replica.getSlice());
-                    slice.setLeader(replica);
-                    replica.setState(Replica.State.ACTIVE);
-                    replica.getProperties().put("leader", "true");
-                    Collection<Replica> replicas = slice.getReplicas();
-                    for (Replica r : replicas) {
-                      if (r != replica) {
-                        r.getProperties().remove("leader");
+                    Map<String,Replica> replicasMap = slice.getReplicasCopy();
+                    Map properties = new HashMap(replica.getProperties());
+
+                    properties.put("leader",  "true");
+                    properties.put("state", Replica.State.ACTIVE);
+                   // properties.put(replica.getInternalId(), "l");
+                    for (Replica r : replicasMap.values()) {
+                      if (replica.getName().equals(r.getName())) {
+                        continue;
+                      }
+                      log.trace("process non leader {} {}", r, r.getProperty(ZkStateReader.LEADER_PROP));
+                      if ("true".equals(r.getProperties().get(ZkStateReader.LEADER_PROP))) {
+                        log.debug("remove leader prop {}", r);
+                        Map<String,Object> props = new HashMap<>(r.getProperties());
+                        props.remove(ZkStateReader.LEADER_PROP);
+                        Replica newReplica = new Replica(r.getName(), props, collection, docColl.getId(), r.getSlice(), overseer.getZkStateReader());
+                        replicasMap.put(r.getName(), newReplica);
                       }
                     }
-                    updates.getProperties().put(replica.getInternalId(), "l");
+
+                    Replica newReplica = new Replica(replica.getName(), properties, collection, docColl.getId(), replica.getSlice(), overseer.getZkStateReader());
+
+                    replicasMap.put(replica.getName(), newReplica);
+
+                    Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), collection, docColl.getId(), overseer.getZkStateReader());
+
+                    Map<String,Slice> newSlices = docColl.getSlicesCopy();
+                    newSlices.put(slice.getName(), newSlice);
+
+                    log.trace("add new slice leader={} {} {}", newSlice.getLeader(), newSlice, docColl);
+
+                    DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+                    cs.put(collection, newDocCollection);
+                    docColl = newDocCollection;
+                    updates.put(replica.getInternalId(), "l");
                     dirtyState.add(collection);
                   } else {
+                    String setState = Replica.State.shortStateToState(state.state).toString();
                     Replica.State s = Replica.State.getState(setState);
-                    Replica existingLeader = docColl.getSlice(replica).getLeader();
-                    if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
-                      docColl.getSlice(replica).setLeader(null);
-                    }
-                    updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
-                    log.debug("set state {} {}", state, replica);
-                    replica.setState(s);
+
+                    log.trace("set state {} {}", state, replica);
+
+                    Slice slice = docColl.getSlice(replica.getSlice());
+                    Map<String,Replica> replicasMap = slice.getReplicasCopy();
+                    Map properties = new HashMap(replica.getProperties());
+
+                    properties.put("state", s);
+                    properties.remove(ZkStateReader.LEADER_PROP);
+
+                    Replica newReplica = new Replica(replica.getName(), properties, collection, docColl.getId(), replica.getSlice(), overseer.getZkStateReader());
+
+                    replicasMap.put(replica.getName(), newReplica);
+
+                    Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), collection, docColl.getId(), overseer.getZkStateReader());
+
+                    Map<String,Slice> newSlices = docColl.getSlicesCopy();
+                    newSlices.put(slice.getName(), newSlice);
+
+                    log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+
+                    DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+                    cs.put(collection, newDocCollection);
+                    docColl = newDocCollection;
+                    updates.put(replica.getInternalId(), state.state);
                     dirtyState.add(collection);
                   }
                 } else {
@@ -252,24 +298,24 @@ public class ZkStateWriter {
                 }
               }
             } else {
-              for (StateUpdate state : entry.getValue()) {
-                log.warn("Could not find existing collection name={}", collection);
-//                String setState = Replica.State.shortStateToState(state.state).toString();
-//                if (setState.equals("leader")) {
-//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
-//                  dirtyState.add(collection);
-//                } else {
-//                  Replica.State s = Replica.State.getState(setState);
-//                  updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
-//                  dirtyState.add(collection);
-//                }
+              for (StateUpdate state : entry.getValue().values()) {
+                log.debug("Could not find existing collection name={}", collection);
+                String setState = Replica.State.shortStateToState(state.state).toString();
+                if (setState.equals("l")) {
+                  updates.put(state.id.substring(state.id.indexOf('-') + 1), "l");
+                  dirtyState.add(collection);
+                } else {
+                  Replica.State s = Replica.State.getState(setState);
+                  updates.put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+                  dirtyState.add(collection);
+                }
               }
               log.debug("version for state updates 0");
               csVersion = "0";
             }
 
             if (dirtyState.contains(collection)) {
-              updates.getProperties().put("_cs_ver_", csVersion);
+              updates.put("_cs_ver_", csVersion);
             }
 
           } finally {
@@ -373,8 +419,10 @@ public class ZkStateWriter {
 
           Stat stat;
           try {
+
             stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
             collection.setZnodeVersion(finalVersion + 1);
+
             if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
           } catch (KeeperException.NoNodeException e) {
             log.debug("No node found for state.json", e);
@@ -388,13 +436,14 @@ public class ZkStateWriter {
 
           reader.getZkClient().setData(pathSCN, null, -1, true, false);
 
-          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          ConcurrentHashMap updates = stateUpdates.get(collection.getName());
           if (updates != null) {
-            updates.getProperties().clear();
+            updates.clear();
+            writeStateUpdates(collection, updates);
           }
 
         } else if (dirtyState.contains(collection.getName())) {
-          ZkNodeProps updates = stateUpdates.get(collection.getName());
+          ConcurrentHashMap updates = stateUpdates.get(collection.getName());
           if (updates != null) {
             try {
               writeStateUpdates(collection, updates);
@@ -423,9 +472,12 @@ public class ZkStateWriter {
     }
   }
 
-  private void writeStateUpdates(DocCollection collection, ZkNodeProps updates) throws KeeperException, InterruptedException {
+  private void writeStateUpdates(DocCollection collection, ConcurrentHashMap updates) throws KeeperException, InterruptedException {
+    if (updates.size() == 0) {
+      return;
+    }
     String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
-    if (log.isDebugEnabled()) log.debug("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
+    log.trace("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
     try {
       reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
     } catch (KeeperException.NoNodeException e) {
@@ -449,36 +501,48 @@ public class ZkStateWriter {
 
 
   public void removeCollection(String collection) {
-    log.info("Removing collection from zk state {}", collection);
-    ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
-      if (reentrantLock == null) {
-        ColState colState = new ColState();
-        return colState;
-      }
-      return reentrantLock;
-    });
-    collState.collLock.lock();
+    log.debug("Removing collection from zk state {}", collection);
     try {
-      Long id = null;
-      for (Map.Entry<Long, String> entry : idToCollection.entrySet()) {
-        if (entry.getValue().equals(collection)) {
-          id = entry.getKey();
-          break;
+      ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
+        if (reentrantLock == null) {
+          ColState colState = new ColState();
+          return colState;
+        }
+        return reentrantLock;
+      });
+      collState.collLock.lock();
+      try {
+        Long id = null;
+        for (Map.Entry<Long,String> entry : idToCollection.entrySet()) {
+          if (entry.getValue().equals(collection)) {
+            id = entry.getKey();
+            break;
+          }
+        }
+        if (id != null) {
+          idToCollection.remove(id);
         }
-      }
-      if (id != null) {
-        idToCollection.remove(id);
         stateUpdates.remove(collection);
+        DocCollection doc = cs.get(collection);
+
+        if (doc != null) {
+          List<Replica> replicas = doc.getReplicas();
+          for (Replica replica : replicas) {
+            overseer.getCoreContainer().getZkController().clearCachedState(replica.getName());
+          }
+        }
+
         cs.remove(collection);
         assignMap.remove(collection);
         dirtyStructure.remove(collection);
         dirtyState.remove(collection);
-        cs.remove(collection);
+
+      } finally {
+        collState.collLock.unlock();
       }
     } catch (Exception e) {
-      log.error("", e);
-    } finally {
-      collState.collLock.unlock();
+      log.error("Exception removing collection", e);
+
     }
   }
 
@@ -505,57 +569,61 @@ public class ZkStateWriter {
   }
 
   public void init() {
+    try {
+      overseer.getCoreContainer().getZkController().clearStatePublisher();
+      ClusterState readerState = reader.getClusterState();
+      if (readerState != null) {
+        reader.forciblyRefreshAllClusterStateSlow();
+        cs.putAll(readerState.copy().getCollectionsMap());
+      }
 
-    ClusterState readerState = reader.getClusterState();
-    if (readerState != null) {
-      cs.putAll(readerState.copy().getCollectionsMap());
-    }
-
-    long[] highId = new long[1];
-    cs.values().forEach(collection -> {
-      String collectionName = collection.getName();
-      ColState collState = collLocks.compute(collectionName, (s, colState) -> {
-        if (colState == null) {
-          ColState cState = new ColState();
-          return cState;
-        }
-        return colState;
-      });
-      collState.collLock.lock();
-      try {
-
-        if (collection.getId() > highId[0]) {
-          highId[0] = collection.getId();
-        }
-
-        idToCollection.put(collection.getId(), collection.getName());
-
+      long[] highId = new long[1];
+      cs.values().forEach(collection -> {
+        String collectionName = collection.getName();
+        ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+          if (colState == null) {
+            ColState cState = new ColState();
+            return cState;
+          }
+          return colState;
+        });
+        collState.collLock.lock();
+        try {
 
-        DocAssign docAssign = new DocAssign();
-        docAssign.name = collection.getName();
-        assignMap.put(docAssign.name, docAssign);
-        int max = 1;
-        Collection<Slice> slices = collection.getSlices();
-        for (Slice slice : slices) {
-          Collection<Replica> replicas = slice.getReplicas();
+          if (collection.getId() > highId[0]) {
+            highId[0] = collection.getId();
+          }
 
-          for (Replica replica : replicas) {
-            Matcher matcher = Assign.pattern.matcher(replica.getName());
-            if (matcher.matches()) {
-              int val = Integer.parseInt(matcher.group(1));
-              max = Math.max(max, val);
+          idToCollection.put(collection.getId(), collection.getName());
+
+          DocAssign docAssign = new DocAssign();
+          docAssign.name = collection.getName();
+          assignMap.put(docAssign.name, docAssign);
+          int max = 1;
+          Collection<Slice> slices = collection.getSlices();
+          for (Slice slice : slices) {
+            Collection<Replica> replicas = slice.getReplicas();
+
+            for (Replica replica : replicas) {
+              Matcher matcher = Assign.pattern.matcher(replica.getName());
+              if (matcher.matches()) {
+                int val = Integer.parseInt(matcher.group(1));
+                max = Math.max(max, val);
+              }
             }
           }
+          docAssign.replicaAssignCnt.set(max);
+        } finally {
+          collState.collLock.unlock();
         }
-        docAssign.replicaAssignCnt.set(max);
-      } finally {
-        collState.collLock.unlock();
-      }
-    });
+      });
 
-    ID.set(highId[0]);
+      ID.set(highId[0]);
 
-    if (log.isDebugEnabled()) log.debug("zkStateWriter starting with cs {}", cs);
+      if (log.isDebugEnabled()) log.debug("zkStateWriter starting with cs {}", cs);
+    } catch (Exception e) {
+      log.error("Exception in ZkStateWriter init", e);
+    }
   }
 
   private static class DocAssign {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 6775b88..5bab87c 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
@@ -139,7 +140,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -674,6 +674,68 @@ public class CoreContainer implements Closeable {
 
     loaded = true;
 
+
+    List<CoreDescriptor> cds = coresLocator.discover(this);
+
+    checkForDuplicateCoreNames(cds);
+    status |= CORE_DISCOVERY_COMPLETE;
+
+    solrCores.load(loader);
+
+    if (isZooKeeperAware()) {
+      try {
+        zkSys.start(this);
+      } catch (IOException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      } catch (KeeperException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      }
+    }
+
+    if (isZooKeeperAware()) {
+      try {
+        getZkController().publishNodeAs(getZkController().getNodeName(), OverseerAction.RECOVERYNODE);
+      } catch (Exception e) {
+        log.error("Failed publishing loading core as recovering", e);
+      }
+
+      List<CoreDescriptor> removeCds = new ArrayList<>();
+      for (final CoreDescriptor cd : cds) {
+
+        DocCollection docCollection = getZkController().getClusterState().getCollectionOrNull(cd.getCollectionName());
+        if (docCollection != null) {
+          Replica replica = docCollection.getReplica(cd.getName());
+          if (replica != null && !getZkController().getBaseUrl().equals(replica.getBaseUrl()) || Long.parseLong(cd.getCoreProperty("collId", "0")) != docCollection.getId()) {
+            removeCds.add(cd);
+            log.error("IllegalStateException wrong base url or collection id for this node in replica entry replica={}", replica);
+            try {
+              while (Files.exists(cd.getInstanceDir())) {
+                try {
+                  Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).forEach(new FileConsumer());
+                } catch (NoSuchFileException | UncheckedIOException e) {
+
+                }
+              }
+            } catch (Exception e) {
+              SolrException.log(log, "Failed to delete instance dir for core:" + cd.getName() + " dir:" + cd.getInstanceDir());
+            }
+
+          }
+        }
+        markCoreAsLoading(cd.getName());
+        String collection = cd.getCollectionName();
+        getZkController().getZkStateReader().registerCore(collection, cd.getName());
+
+      }
+      for (CoreDescriptor removeCd : removeCds) {
+        cds.remove(removeCd);
+      }
+    } else {
+      for (final CoreDescriptor cd : cds) {
+        markCoreAsLoading(cd.getName());
+      }
+    }
+
     // Always add $SOLR_HOME/lib to the shared resource loader
     Set<String> libDirs = new LinkedHashSet<>();
     libDirs.add("lib");
@@ -706,8 +768,6 @@ public class CoreContainer implements Closeable {
 
     try {
 
-      solrCores.load(loader);
-
       logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);
 
       hostName = cfg.getNodeName();
@@ -720,16 +780,6 @@ public class CoreContainer implements Closeable {
       createHandler(ZK_PATH, ZookeeperInfoHandler.class.getName(), ZookeeperInfoHandler.class);
       createHandler(ZK_STATUS_PATH, ZookeeperStatusHandler.class.getName(), ZookeeperStatusHandler.class);
 
-      if (isZooKeeperAware()) {
-        try {
-          zkSys.start(this);
-        } catch (IOException e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        } catch (KeeperException e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        }
-      }
-
       try (ParWork work = new ParWork(this, false, false)) {
 
         boolean enableMetrics = Boolean.parseBoolean(System.getProperty("solr.enableMetrics", "true"));
@@ -852,59 +902,14 @@ public class CoreContainer implements Closeable {
 
     List<Future<SolrCore>> coreLoadFutures = null;
 
-    List<CoreDescriptor> cds = coresLocator.discover(this);
+
     coreLoadFutures = new ArrayList<>(cds.size());
     if (isZooKeeperAware()) {
       cds = CoreSorter.sortCores(this, cds);
     }
-    checkForDuplicateCoreNames(cds);
-    status |= CORE_DISCOVERY_COMPLETE;
-    startedLoadingCores = true;
-
-    if (isZooKeeperAware()) {
-
-      log.info("Waiting to see not ACTIVE states for node on startup ...");
-      String nodeName = getZkController().getNodeName();
-      for (final CoreDescriptor cd : cds) {
-        String collection = cd.getCollectionName();
-        getZkController().getZkStateReader().registerCore(collection, cd.getName());
-        try {
-          getZkController().getZkStateReader().waitForState(collection, 3, TimeUnit.SECONDS, (n, c) -> {
-            if (c == null) {
-              if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
-              return false;
-            }
-
-
-            List<Replica> replicas = c.getReplicas();
-            for (Replica replica : replicas) {
-              log.trace("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
-              if (replica.getNodeName().equals(nodeName)) {
-                if (replica.getState().equals(State.ACTIVE)) {
-                  if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={} replica={}", replica.getState(), replica.getNodeName(), nodeName, replica);
-                  return false;
-                }
-              } else {
-               // if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
-              }
-            }
 
-            return true;
-          });
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          return;
-        } catch (TimeoutException e) {
-          log.error("Timeout", e);
-        }
-      }
-    }
+    startedLoadingCores = true;
 
-    for (final CoreDescriptor cd : cds) {
-      if (!cd.isTransient() && cd.isLoadOnStartup()) {
-        solrCores.markCoreAsLoading(cd);
-      }
-    }
 
     if (isZooKeeperAware()) {
       zkSys.getZkController().createEphemeralLiveNode();
@@ -917,28 +922,29 @@ public class CoreContainer implements Closeable {
         solrCores.addCoreDescriptor(cd);
       }
 
-      if (isZooKeeperAware()) {
-        String collection = cd.getCollectionName();
-
-        if (!zkSys.zkController.getClusterState().hasCollection(collection)) {
-          solrCores.markCoreAsNotLoading(cd);
-          try {
-            coresLocator.delete(this, cd);
-          } catch (Exception e) {
-            log.error("Exception deleting core.properties file for non existing collection", e);
-          }
-
-          try {
-            unload(cd, cd.getName(),true, true, true);
-          } catch (Exception e) {
-            log.error("Exception unloading core for non existing collection", e);
-          }
-          continue;
-        }
-      }
+      // MRM TODO: look at ids for this
+//      if (isZooKeeperAware()) {
+//        String collection = cd.getCollectionName();
+//
+//        if (!zkSys.zkController.getClusterState().hasCollection(collection)) {
+//          solrCores.markCoreAsNotLoading(cd);
+//          try {
+//            coresLocator.delete(this, cd);
+//          } catch (Exception e) {
+//            log.error("Exception deleting core.properties file for non existing collection", e);
+//          }
+//
+//          try {
+//            unload(cd, cd.getName(),true, true, true);
+//          } catch (Exception e) {
+//            log.error("Exception unloading core for non existing collection", e);
+//          }
+//          continue;
+//        }
+//      }
 
       if (cd.isLoadOnStartup()) {
-
+        startedLoadingCores = true;
         coreLoadFutures.add(solrCoreExecutor.submit(() -> {
           SolrCore core = null;
           MDCLoggingContext.setCoreName(cd.getName());
@@ -1351,16 +1357,19 @@ public class CoreContainer implements Closeable {
       ParWork.propagateInterrupt(ex);
       // First clean up any core descriptor, there should never be an existing core.properties file for any core that
       // failed to be created on-the-fly.
-      coresLocator.delete(this, cd);
-      if (isZooKeeperAware() && !preExisitingZkEntry) {
-        try {
-          getZkController().unregister(coreName, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
-        } catch (Exception e) {
-          log.error("", e);
+      try {
+        coresLocator.delete(this, cd);
+        if (isZooKeeperAware() && !preExisitingZkEntry && zkSys.getZkController().getZkClient().isAlive()) {
+          try {
+            getZkController().unregister(coreName, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
+          } catch (Exception e) {
+            log.error("", e);
+          }
+        }
+      } finally {
+        if (core != null) {
+          core.doClose();
         }
-      }
-      if (core != null) {
-        core.closeAndWait();
       }
 
       Throwable tc = ex;
@@ -1439,36 +1448,27 @@ public class CoreContainer implements Closeable {
       timeValidateCoreNameLoadConfigSet.done();
 
       try {
+        core = new SolrCore(this, dcore, coreConfig);
+      } catch (Exception e) {
+        core = processCoreCreateException(e, dcore, coreConfig);
+      }
 
-        try {
-          core = new SolrCore(this, dcore, coreConfig);
-        } catch (Exception e) {
-          core = processCoreCreateException(e, dcore, coreConfig);
-        }
-
-        core.start();
+      core.start();
 
-        StopWatch timeRegisterCore = new StopWatch(dcore.getName() + "-registerCore");
-        old = registerCore(dcore, core, true);
-        registered = true;
-        timeRegisterCore.done();
+      StopWatch timeRegisterCore = new StopWatch(dcore.getName() + "-registerCore");
+      registerCore(dcore, core, true);
+      registered = true;
+      timeRegisterCore.done();
 
-        if (isZooKeeperAware()) {
-          StopWatch timeKickOffAsyncZkReg = new StopWatch(dcore.getName() + "-kickOffAsyncZkReg");
-          if (!newCollection) {
-            if (core.getDirectoryFactory().isSharedStorage()) {
-              zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
-            }
+      if (isZooKeeperAware()) {
+        StopWatch timeKickOffAsyncZkReg = new StopWatch(dcore.getName() + "-kickOffAsyncZkReg");
+        if (!newCollection) {
+          if (core.getDirectoryFactory().isSharedStorage()) {
+            zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
           }
-          ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
-          timeKickOffAsyncZkReg.done();
         }
-
-      } catch (Exception e) {
-
-        throw new SolrException(ErrorCode.SERVER_ERROR, e);
-      } finally {
-        solrCores.markCoreAsNotLoading(dcore);
+        ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
+        timeKickOffAsyncZkReg.done();
       }
 
       // always kick off recovery if we are in non-Cloud mode
@@ -1480,6 +1480,7 @@ public class CoreContainer implements Closeable {
     } catch (Exception e) {
       ParWork.propagateInterrupt(e);
       log.error("Unable to create SolrCore", e);
+      solrCores.markCoreAsNotLoading(dcore);
       coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
       if (e instanceof ZkController.NotInClusterStateException && !newCollection) {
         // this mostly happen when the core is deleted when this node is down
@@ -1506,31 +1507,10 @@ public class CoreContainer implements Closeable {
             SolrCore finalCore1 = core;
             try {
               solrCoreExecutor.submit(() -> {
-                finalCore1.closeAndWait();
-              });
-            } catch (RejectedExecutionException e) {
-              finalCore1.closeAndWait();
-            }
-            SolrCore finalOld = old;
-            if (finalOld != null) {
-              try {
-                solrCoreExecutor.submit(() -> {
-                  finalOld.closeAndWait();
-                });
-              } catch (RejectedExecutionException e) {
-                finalOld.closeAndWait();
-              }
-            }
-
-          }
-          if (isShutDown) {
-            SolrCore finalCore1 = core;
-            try {
-              solrCoreExecutor.submit(() -> {
-                finalCore1.closeAndWait();
+                finalCore1.doClose();
               });
             } catch (RejectedExecutionException e) {
-              finalCore1.closeAndWait();
+              finalCore1.doClose();
             }
           }
         }
@@ -1968,6 +1948,7 @@ public class CoreContainer implements Closeable {
     try {
 
       if (isZooKeeperAware()) {
+        getZkController().clearCachedState(name);
         if (cd != null) {
           try {
             zkSys.getZkController().unregister(name, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index f380a0f..34f028d 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -749,13 +749,13 @@ public final class SolrCore implements SolrInfoBean, Closeable {
             coreContainer.solrCoreExecutor.submit(() -> {
               try {
                 log.warn("Closing failed SolrCore from failed reload");
-                finalCore.closeAndWait();
+                finalCore.close();
               } catch (Exception e) {
                 log.error("Exception waiting for core to close on reload failure", e);
               }
             });
           } catch (RejectedExecutionException e) {
-            finalCore.closeAndWait();
+            finalCore.close();
           }
         }
       }
@@ -2479,6 +2479,9 @@ public final class SolrCore implements SolrInfoBean, Closeable {
             tmp = new SolrIndexSearcher(this, newIndexDir, getLatestSchema(), (realtime ? "realtime" : "main"), newReader, true, !realtime, true,
                 directoryFactory);
           } else {
+            if (coreContainer.isShutDown() || closing) {
+              throw new AlreadyClosedException();
+            }
             RefCounted<IndexWriter> writer = getSolrCoreState().getIndexWriter(this);
             DirectoryReader newReader = null;
             try {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 78a20d5..ae29756 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -86,7 +86,8 @@ class SolrCores implements Closeable {
   }
 
   public void load(SolrResourceLoader loader) {
-    transientCoreCache = TransientSolrCoreCacheFactory.newInstance(loader, container);
+    // TODO
+    // transientCoreCache = TransientSolrCoreCacheFactory.newInstance(loader, container);
   }
 
   // We are shutting down. You can't hold the lock on the various lists of cores while they shut down, so we need to
@@ -462,8 +463,8 @@ class SolrCores implements Closeable {
   public TransientSolrCoreCache getTransientCacheHandler() {
 
     if (transientCoreCache == null) {
-      log.error("No transient handler has been defined. Check solr.xml to see if an attempt to provide a custom {}"
-          , "TransientSolrCoreCacheFactory was done incorrectly since the default should have been used otherwise.");
+//      log.error("No transient handler has been defined. Check solr.xml to see if an attempt to provide a custom {}"
+//          , "TransientSolrCoreCacheFactory was done incorrectly since the default should have been used otherwise.");
       return null;
     }
     return transientCoreCache.getTransientSolrCoreCache();
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 44baa39..8b388cc 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -731,7 +731,7 @@ public class IndexFetcher {
     ZkController zkController = solrCore.getCoreContainer().getZkController();
     CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
     Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
-        cd.getCollectionName(), cd.getShardId(), 3000, true);
+        cd.getCollectionName(), cd.getShardId(), 5000);
     return leaderReplica;
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 8f925c4..260e8e0 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -73,7 +74,7 @@ public class ColStatus {
   }
 
   @SuppressWarnings({"unchecked"})
-  public void getColStatus(NamedList<Object> results) {
+  public void getColStatus(NamedList<Object> results) throws TimeoutException, InterruptedException {
     Collection<String> collections;
     String col = props.getStr(ZkStateReader.COLLECTION_PROP);
     if (col == null) {
@@ -155,7 +156,7 @@ public class ColStatus {
           sliceMap.add("routingRules", rules);
         }
         sliceMap.add("replicas", replicaMap);
-        Replica leader = zkStateReader.getLeader(collection, s.getName());
+        Replica leader = zkStateReader.getLeaderRetry(collection, s.getName(), 10000);
         if (leader == null) { // pick the first one
           leader = s.getReplicas().size() > 0 ? s.getReplicas().iterator().next() : null;
         }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 5f9f90e..bbf9cef 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -394,6 +394,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     } else {
       if (log.isDebugEnabled()) log.debug("no data in response, checking for timeout");
       if (System.nanoTime() - time >= TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
+        if (log.isDebugEnabled()) {
+          coreContainer.getZkController().getZkClient().printLayout();
+        }
         throw new SolrException(ErrorCode.SERVER_ERROR, operation
             + " the collection time out:" + timeout / 1000 + "s " + m);
       } else if (event.getWatchedEvent() != null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 8f17127..dee1675 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,9 +18,7 @@
 package org.apache.solr.handler.admin;
 
 import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.ZkController.NotInClusterStateException;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
@@ -53,12 +51,28 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
 
     String shard = params.get(ZkStateReader.SHARD_ID_PROP);
 
-    Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
+    String state = params.get(ZkStateReader.STATE_PROP);
+
+    Replica.State waitForState = null;
+    if (state != null) {
+      waitForState = Replica.State.getState(state);
+    }
 
     log.info(
         "Going to wait for core: {}, state: {}: params={}",
         cname, waitForState, params);
 
+    LeaderElector leaderElector = it.handler.coreContainer.getZkController().getLeaderElector(leaderName);
+    if (leaderElector == null || !leaderElector.isLeader()) {
+      throw new IllegalStateException("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+          " coll=" + collection);
+    }
+
+    if (waitForState == null) {
+      log.info("Done checking leader:", cname);
+      return;
+    }
+
     assert TestInjection.injectPrepRecoveryOpPauseForever();
 
     CoreContainer coreContainer = it.handler.coreContainer;
@@ -66,6 +80,8 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
     AtomicReference<String> errorMessage = new AtomicReference<>();
 
     try {
+      Replica.State finalWaitForState = waitForState;
+      Replica.State finalWaitForState1 = waitForState;
       coreContainer.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
         if (c == null) {
           return false;
@@ -81,9 +97,9 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
 
         isLive = coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName());
         if (isLive) {
-          if (replica.getState() == waitForState) {
+          if (replica.getState() == finalWaitForState) {
             if (log.isDebugEnabled()) {
-              log.debug("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), waitForState,
+              log.debug("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), finalWaitForState,
                   coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName()));
             }
             return true;
@@ -93,24 +109,15 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
         }
 
         errorMessage.set(
-            "Timeout waiting to see " + waitForState + " state replica=" + cname + " state=" + (replica == null ? "(null replica)" : replica.getState())
-                + " waitForState=" + waitForState + " isLive=" + isLive + "\n" + coreContainer.getZkController().getZkStateReader().getClusterState()
-                .getCollectionOrNull(collection));
+            "Timeout waiting to see " + finalWaitForState + " state replica=" + cname + " state=" + (replica == null ? "(null replica)" : replica.getState())
+                + " waitForState=" + finalWaitForState1 + " isLive=" + isLive);
         return false;
       });
 
     } catch (TimeoutException | InterruptedException e) {
       ParWork.propagateInterrupt(e);
       String error = errorMessage.get();
-      if (error == null)
-        error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
-      throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+      log.error(error);
     }
-
-//    LeaderElector leaderElector = it.handler.coreContainer.getZkController().getLeaderElector(leaderName);
-//    if (leaderElector == null || !leaderElector.isLeader()) {
-//      throw new IllegalStateException("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
-//          " coll=" + it.handler.getCoreContainer().getZkController().getClusterState().getCollectionOrNull(collection));
-//    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 558f67e..d5152ca 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -49,6 +49,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.LeaderElector;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentBase;
@@ -140,7 +141,14 @@ public class RealTimeGetComponent extends SearchComponent
     }
 
     String onlyIfLeader = params.get("onlyIfLeader");
-    // MRM TODO:
+
+    if (req.getCore().getCoreContainer().isZooKeeperAware() && Boolean.parseBoolean(onlyIfLeader)) {
+      LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+      if (leaderElector == null || !leaderElector.isLeader()) {
+        throw new IllegalStateException("Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+            " coll=" + req.getCore().getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
+      }
+    }
 
     String val = params.get("getFingerprint");
     if(val != null) {
diff --git a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
index f4cf0b8..3e99fa0 100644
--- a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
+++ b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
@@ -149,7 +149,7 @@ public abstract class ManagedResourceStorage {
         dir.mkdirs();
 
       storageDir = dir.getAbsolutePath();      
-      log.info("File-based storage initialized to use dir: {}", storageDir);
+      log.debug("File-based storage initialized to use dir: {}", storageDir);
     }
     
     @Override
@@ -332,7 +332,7 @@ public abstract class ManagedResourceStorage {
         byte[] znodeData = toByteArray();
         try {
           zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
-          log.info("Wrote {} bytes to new znode {}", znodeData.length, znodePath);
+          log.debug("Wrote {} bytes to new znode {}", znodeData.length, znodePath);
 
         } catch (KeeperException.NodeExistsException e) {
           try {
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index e1cb9e8..15912da 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -18,6 +18,7 @@ package org.apache.solr.schema;
 
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.util.IOUtils;
@@ -106,7 +107,7 @@ public class ZkIndexSchemaReader implements OnReconnect, Closeable {
     public void close() throws IOException {
       try {
         schemaReader.zkClient.removeWatches(schemaReader.managedSchemaPath, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         if (log.isDebugEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index a83165b..c9e803a 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -260,6 +260,7 @@ public class HttpSolrCall {
       core = cores.getCore(origCorename);
 
       if (core == null && cores.isCoreLoading(origCorename)) {
+        log.debug("core is loading, will wait a bit");
         cores.waitForLoadingCore(origCorename, 5000);
         core = cores.getCore(origCorename);
       }
@@ -349,10 +350,10 @@ public class HttpSolrCall {
           solrReq = parser.parse(core, path, req);
         }
 
-        invalidStates = checkStateVersionsAreValid(queryParams.get(CloudSolrClient.STATE_VERSION));
 
-        ensureStatesAreAtLeastAtClient();
+        invalidStates = checkStateVersionsAreValid(getCollectionsList(), queryParams.get(CloudSolrClient.STATE_VERSION));
 
+        ensureStatesAreAtLeastAtClient();
         addCollectionParamIfNeeded(getCollectionsList());
 
         action = PROCESS;
@@ -494,17 +495,16 @@ public class HttpSolrCall {
 
   protected void extractRemotePath(String collectionName) throws UnsupportedEncodingException, KeeperException, InterruptedException, SolrException, TimeoutException {
 
-    ensureStatesAreAtLeastAtClient();
 
     coreUrl = getRemoteCoreUrl(collectionName);
     // don't proxy for internal update requests
-    Map<String,Integer> invalidStates = checkStateVersionsAreValid(queryParams.get(CloudSolrClient.STATE_VERSION));
+//    Map<String,Integer> invalidStates = checkStateVersionsAreValid(getCollectionsList(), queryParams.get(CloudSolrClient.STATE_VERSION));
     if (coreUrl != null
         && queryParams.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
-      if (invalidStates != null) {
-        //it does not make sense to send the request to a remote node
-        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, new String(Utils.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
-      }
+//      if (invalidStates != null) {
+//        //it does not make sense to send the request to a remote node
+//        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, new String(Utils.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
+//      }
       action = REMOTEQUERY;
     } else {
       if (!retry) {
@@ -1033,8 +1033,20 @@ public class HttpSolrCall {
   }
 
   /** Returns null if the state ({@link CloudSolrClient#STATE_VERSION}) is good; otherwise returns state problems. */
-  private Map<String, Integer> checkStateVersionsAreValid(String stateVer) {
+  private Map<String, Integer> checkStateVersionsAreValid(List<String> collectionsList, String stateVer) {
     // TODO: for collections that are local and watched, we should just wait for the right min state, not eager fetch everything
+//    Set<String> colList = cores.getZkController().getZkStateReader().getClusterState().getCollectionsMap().keySet();
+//    if ((stateVer == null || stateVer.isEmpty()) && cores.isZooKeeperAware()) {
+//      StringBuilder sb = new StringBuilder();
+//      for (String collection : colList) {
+//        if (sb.length() > 0) {
+//          sb.append("|");
+//        }
+//        sb.append(collection + ":0>0");
+//      }
+//      stateVer = sb.toString();
+//    }
+
     Map<String, Integer> result = null;
     String[] pairs;
     if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
@@ -1043,7 +1055,12 @@ public class HttpSolrCall {
       for (String pair : pairs) {
         String[] pcs = StringUtils.split(pair, ':');
         if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
-          Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1]));
+
+          String[] versionAndUpdatesHash = pcs[1].split(">");
+          int version = Integer.parseInt(versionAndUpdatesHash[0]);
+          int updateHash = Integer.parseInt(versionAndUpdatesHash[1]);
+
+          Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], version, updateHash);
           if (status != null) {
             if (result == null) result = new HashMap<>();
             result.put(pcs[0], status);
@@ -1083,8 +1100,6 @@ public class HttpSolrCall {
   protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) throws TimeoutException, InterruptedException {
     log.debug("get core by collection {} {}", collectionName, isPreferLeader);
 
-    ensureStatesAreAtLeastAtClient();
-
     ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
 
     ClusterState clusterState = zkStateReader.getClusterState();
@@ -1095,6 +1110,12 @@ public class HttpSolrCall {
       return null;
     }
 
+    try {
+      zkStateReader.waitForActiveCollection(collectionName, 10000, TimeUnit.MILLISECONDS, true, collection.getSlices().size(), collection.getReplicas().size(), false);
+    } catch (Exception e) {
+      log.warn("Did not find leaders for collection:" + collection.getName());
+    }
+
     if (isPreferLeader) {
       List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
       log.debug("preferLeader leaderReplicas={}", leaderReplicas);
@@ -1114,7 +1135,7 @@ public class HttpSolrCall {
       RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
       while (it.hasNext()) {
         Replica replica = it.next();
-        if (!checkActive || (liveNodes.contains(replica.getNodeName()))) {
+        if (liveNodes.contains(replica.getNodeName())) {
           SolrCore core = checkProps(replica);
           if (core != null && checkActive && replica.getState() != Replica.State.ACTIVE) {
             try {
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index 27896e1..60131a5 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -406,27 +406,24 @@ public class SolrDispatchFilter extends BaseSolrFilter {
   
   public void close() {
     CoreContainer cc = cores;
-
     try {
-      if (metricManager != null) {
-        try {
-          metricManager.unregisterGauges(registryName, metricTag);
-        } catch (NullPointerException e) {
-          // okay
-        } catch (Exception e) {
-          log.warn("Exception closing FileCleaningTracker", e);
-        } finally {
-          metricManager = null;
-        }
-      }
+//      if (metricManager != null) {
+//        try {
+//          metricManager.unregisterGauges(registryName, metricTag);
+//        } catch (NullPointerException e) {
+//          // okay
+//        } catch (Exception e) {
+//          log.warn("Exception closing FileCleaningTracker", e);
+//        } finally {
+//          metricManager = null;
+//        }
+//      }
     } finally {
-     // if (!cc.isShutDown()) {
+      if (!cc.isShutDown()) {
         log.info("CoreContainer is not yet shutdown during filter destroy, shutting down now {}", cc);
         GlobalTracer.get().close();
         stopCoreContainer(cc);
-    //  }
-
-
+      }
 
 //      if (SolrLifcycleListener.isRegisteredStopped(stopRunnable)) {
 //        SolrLifcycleListener.removeStopped(stopRunnable);
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index a1c8249..1bfe076 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * Used for distributing commands from a shard leader to its replicas.
  */
 public class SolrCmdDistributor implements Closeable {
-  private static final int MAX_RETRIES_ON_FORWARD = 2;
+  private static final int MAX_RETRIES_ON_FORWARD = 10;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final ConnectionManager.IsClosed isClosed;
   private final ZkStateReader zkStateReader;
@@ -136,6 +136,11 @@ public class SolrCmdDistributor implements Closeable {
       }
 
       if (err.req.retries < maxRetries && doRetry && !isClosed.isClosed()) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+
+        }
         err.req.retries++;
 
         SolrException.log(SolrCmdDistributor.log, "sending update to "
@@ -457,7 +462,7 @@ public class SolrCmdDistributor implements Closeable {
 
   public static class StdNode extends Node {
     protected final ZkStateReader zkStateReader;
-    protected Replica nodeProps;
+    protected volatile Replica nodeProps;
     protected String collection;
     protected String shardId;
     private final boolean retry;
@@ -505,7 +510,7 @@ public class SolrCmdDistributor implements Closeable {
 
     @Override
     public boolean checkRetry() {
-      return true;
+      return false;
     }
 
     @Override
@@ -579,6 +584,7 @@ public class SolrCmdDistributor implements Closeable {
 
     @Override
     public boolean checkRetry() {
+      log.debug("check retry");
       Replica leaderProps;
       try {
         leaderProps = zkStateReader.getLeaderRetry(
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index a757c47..8de165c 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -231,7 +231,8 @@ public class SolrIndexWriter extends IndexWriter {
       directoryFactory.release(getDirectory());
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating IndexWriter");
     }
-    assert ObjectReleaseTracker.track(this);
+    // TODO:
+    // assert ObjectReleaseTracker.track(this);
   }
 
   public static Directory getDir(DirectoryFactory directoryFactory, String path, SolrIndexConfig config) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index b9c6461..5eb07c9 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1913,7 +1913,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
 
 
-  protected RecoveryInfo recoveryInfo;
+  protected volatile RecoveryInfo recoveryInfo;
 
   class LogReplayer implements Runnable {
     private Logger loglog = log;  // set to something different?
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 1a2d28a..01a7572 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -466,7 +466,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           // we're not in an active state, and this update isn't from a replay, so buffer it.
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.add(cmd);
-          log.info("docid={} dropped because not active and buffering and not a replay update", cmd.getPrintableId());
+          log.debug("docid={} buffering update", cmd.getPrintableId());
           if (SkyHook.skyHookDoc != null) {
             SkyHook.skyHookDoc.register(cmd.getPrintableId(), "dropping update, non logic applied, but we are buffering - added to ulog only");
           }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index b5ce87c..e59b421 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.LeaderElector;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkShardTerms;
@@ -164,7 +165,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
     Replica leaderReplica;
     try {
-      leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId(), 10000);
+      leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId(), 1000);
     } catch (Exception e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
@@ -682,7 +683,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           + "failed since we're not in cloud mode.");
     }
     try {
-      return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 3000, true).getCoreUrl();
+      return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 5000).getCoreUrl();
     } catch (InterruptedException | TimeoutException e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
@@ -773,10 +774,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     String shardId = slice.getName();
     Replica leaderReplica;
     try {
+
+      doDefensiveChecks(phase);
+
       // Not equivalent to getLeaderProps, which  retries to find a leader.
-      // Replica leader = slice.getLeader();
-      leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 5000);
-      isLeader = leaderReplica.getName().equals(desc.getName());
+      leaderReplica = slice.getLeader();
+//      leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 10000);
+      isLeader = leaderReplica != null && leaderReplica.getName().equals(desc.getName());
 
       if (!isLeader) {
         isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
@@ -785,8 +789,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         }
       }
 
-      doDefensiveChecks(phase);
-
       // if request is coming from another collection then we want it to be sent to all replicas
       // even if its phase is FROMLEADER
       String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
@@ -958,7 +960,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
       Replica myLeader = null;
       try {
-        myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 1500, true);
+        myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 5000);
       } catch (Exception e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
       }
@@ -1152,6 +1154,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     DocCollection docCollection = clusterState.getCollection(collection);
     Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
 
+    if (DistribPhase.TOLEADER == phase) {
+      LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+      if (leaderElector == null || !leaderElector.isLeader()) {
+        throw new IllegalStateException(
+            "Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) + " coll=" + req.getCore()
+                .getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
+      }
+    }
+
     if (DistribPhase.FROMLEADER == phase && isLeader && from != null) { // from will be null on log replay
       String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
       if (fromShard != null) {
diff --git a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
index 7400591..351ecec 100644
--- a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
+++ b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
@@ -149,7 +149,7 @@ public abstract class AbstractPluginLoader<T>
     AtomicReference<T> defaultPlugin = new AtomicReference<>();
     XPath xpath = loader.getXPath();
     if (nodes !=null ) {
-      for (int i=0; i<nodes.size(); i++) {
+      for (int i = 0; i < nodes.size(); i++) {
         try (ParWork parWork = new ParWork(this, false, true)) {
           NodeInfo node = nodes.get(i);
 
@@ -164,40 +164,39 @@ public abstract class AbstractPluginLoader<T>
             }
 
             String finalName = name;
-            parWork.collect(name, ()->{
-              try {
-                T plugin = create(loader, finalName, className, node, xpath);
 
-                if (log.isTraceEnabled()) {
-                  log.trace("created {}: {}", ((finalName != null) ? finalName : ""), plugin.getClass().getName());
-                }
+            try {
+              T plugin = create(loader, finalName, className, node, xpath);
 
-                // Either initialize now or wait till everything has been registered
-                if (preRegister) {
-                  info.add(new PluginInitInfo(plugin, node));
-                } else {
-                  init(plugin, node);
-                }
+              if (log.isTraceEnabled()) {
+                log.trace("created {}: {}", ((finalName != null) ? finalName : ""), plugin.getClass().getName());
+              }
 
-                T old = register(finalName, plugin);
-                if (old != null && !(finalName == null && !requireName)) {
-                  throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple " + type + " registered to the same name: " + finalName + " ignoring: " + old);
-                }
+              // Either initialize now or wait till everything has been registered
+              if (preRegister) {
+                info.add(new PluginInitInfo(plugin, node));
+              } else {
+                init(plugin, node);
+              }
 
-                if (defaultStr != null && Boolean.parseBoolean(defaultStr)) {
-                  if (defaultPlugin.get() != null) {
-                    throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple default " + type + " plugins: " + defaultPlugin + " AND " + finalName);
-                  }
-                  defaultPlugin.set(plugin);
-                }
-              } catch (Exception e) {
-                if (e instanceof RuntimeException) {
-                  throw (RuntimeException) e;
-                } else {
-                  throw new SolrException(ErrorCode.SERVER_ERROR, e);
+              T old = register(finalName, plugin);
+              if (old != null && !(finalName == null && !requireName)) {
+                throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple " + type + " registered to the same name: " + finalName + " ignoring: " + old);
+              }
+
+              if (defaultStr != null && Boolean.parseBoolean(defaultStr)) {
+                if (defaultPlugin.get() != null) {
+                  throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple default " + type + " plugins: " + defaultPlugin + " AND " + finalName);
                 }
+                defaultPlugin.set(plugin);
               }
-            });
+            } catch (Exception e) {
+              if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+              } else {
+                throw new SolrException(ErrorCode.SERVER_ERROR, e);
+              }
+            }
 
           } catch (Exception ex) {
             ParWork.propagateInterrupt(ex);
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index b94af95..45be93a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -89,7 +89,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     System.setProperty("solr.zkclienttimeout", "15000");
     System.setProperty("zkClientTimeout", "15000");
 
-
+    System.setProperty("solr.getleader.looptimeout", "10000");
     String timeout = "640000";
     System.setProperty("solr.http2solrclient.default.idletimeout", timeout);
     System.setProperty("distribUpdateSoTimeout", timeout);
@@ -438,6 +438,18 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 //    Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
 //    assertEquals(1, coresStatus.size());
 
+    cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 5000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> {
+      if (collectionState == null) {
+        return false;
+      }
+      for (Replica replica  : collectionState.getReplicas()) {
+        if (replica.getState() != Replica.State.ACTIVE) {
+          return  false;
+        }
+      }
+      return true;
+    });
+
     DocCollection testCollection = getCollectionState(collectionName);
 
     Replica replica1 = testCollection.getReplicas().iterator().next();
@@ -555,6 +567,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   @LuceneTestCase.Nightly
   public void testColStatus() throws Exception {
     final String collectionName = "collectionStatusTest";
+
     CollectionAdminRequest.createCollection(collectionName, "conf2", 2, 2)
         .setMaxShardsPerNode(100)
         .waitForFinalState(true)
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 5e7a97e..4a506ed 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -101,6 +101,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     Create req = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2).waitForFinalState(true);
     req.process(cluster.getSolrClient());
 
+    cluster.waitForActiveCollection(collectionName, 2, 4);
+
     DocCollection state = getCollectionState(collectionName);
     Slice shard = getRandomShard(state);
     
@@ -151,7 +153,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     final String collectionName = "deletereplica_test";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).waitForFinalState(true).process(cluster.getSolrClient());
 
-    Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1");
+    Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000, true);
 
     //Confirm that the instance and data directory exist
     CoreStatus coreStatus = getCoreStatus(leader);
@@ -162,9 +164,9 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     req.setWaitForFinalState(true);
     req.process(cluster.getSolrClient());
 
-    Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 2000);
-
+    log.info("leader was {}", leader);
 
+    Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000, true);
 
     org.apache.solr.common.util.TimeOut timeOut = new org.apache.solr.common.util.TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
     while (!timeOut.hasTimedOut()) {
@@ -398,7 +400,9 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
       int finalI = i;
       threads[i] = new Thread(() -> {
         int doc = finalI * (TEST_NIGHTLY ? 10000 : 100);
+        int cnt = 0;
         while (!closed.get()) {
+          cnt++;
           try {
             cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(doc++)));
           }  catch (AlreadyClosedException e) {
@@ -407,15 +411,18 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
           } catch (Exception e) {
             log.error("Failed on adding document to {}", collectionName, e);
           }
+          // TODO: why did this not stop anymore? Need to write out state sooner?oip9
+          if (cnt > 10000) {
+            break;
+          }
         }
       });
       futures.add(ParWork.getRootSharedExecutor().submit(threads[i]));
     }
 
-
-
+    Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000);
     Slice shard1 = getCollectionState(collectionName).getSlice("s1");
-    Replica nonLeader = shard1.getReplicas(rep -> !rep.getName().equals(shard1.getLeader().getName())).get(0);
+    Replica nonLeader = shard1.getReplicas(rep -> !rep.getName().equals(leader.getName())).get(0);
     CollectionAdminRequest.DeleteReplica req = CollectionAdminRequest.deleteReplica(collectionName, "s1", nonLeader.getName());
     req.setWaitForFinalState(true);
     req.process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index 95e9bb7..7740fa2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -137,6 +137,8 @@ public class DeleteShardTest extends SolrCloudTestCase {
         .waitForFinalState(true)
         .process(cluster.getSolrClient());
 
+    cluster.waitForActiveCollection(collection, 3, 3);
+
     // Get replica details
     Replica leader = getCollectionState(collection).getLeader("a");
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 8c8a05e..b39cdf3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -526,7 +526,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
       final String shardName = entry.getKey();
       final Slice slice = entry.getValue();
       log.info("Checking: {} -> {}", shardName, slice);
-      final Replica leader = entry.getValue().getLeader();
+      final Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collection.getName(), shardName, 5000);
       try (Http2SolrClient leaderClient = SolrTestCaseJ4.getHttpSolrClient(leader.getCoreUrl())) {
         final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
         log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
index af02db3..b447534 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
@@ -98,8 +98,6 @@ public class RecoveryZkTest extends SolrCloudTestCase {
 
     cluster.getSolrClient().setDefaultCollection(collection);
 
-    cluster.waitForActiveCollection(collection, 1, 2);
-
     // start a couple indexing threads
     
     int[] maxDocList = new int[] {25, 55};
@@ -156,15 +154,27 @@ public class RecoveryZkTest extends SolrCloudTestCase {
 
     cluster.waitForActiveCollection(collection, 1, 2);
 
-    new UpdateRequest()
-        .commit(cluster.getSolrClient(), collection);
-
-    cluster.waitForActiveCollection(collection, 1, 2);
-
     // test that leader and replica have same doc count
-    state = getCollectionState(collection);
-    assertShardConsistency(state.getSlice("s1"), true);
 
+    int cnt = 0;
+    while (true) {
+      try {
+        new UpdateRequest().commit(cluster.getSolrClient(), collection);
+      } catch (Exception e) {
+        log.info("commit fail", e);
+      }
+
+      try {
+        state = getCollectionState(collection);
+        assertShardConsistency(state.getSlice("s1"), true);
+        break;
+      } catch (AssertionError error) {
+        if (cnt++ > 5) {
+          throw error;
+        }
+      }
+      Thread.sleep(500);
+    }
   }
 
   private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
index 335e0cd..f1db9fb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
@@ -440,7 +440,7 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
         leader = tmp;
         break;
       }
-      Thread.sleep(50);
+      Thread.sleep(250);
     }
     assertNotNull("Could not find active leader for " + shardId + " of " +
         testCollectionName + " after "+timeoutSecs+" secs;", leader);
diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
index e341bae..2a1d35a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
@@ -23,7 +23,6 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -147,9 +146,9 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
       //   updateMappingsFromZk(this.jettys, this.clients);
       leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 5)));
       if (deadJetty == leaderJetty) {
-        Thread.sleep(100);
+        Thread.sleep(500);
       }
-      if (cnt++ >= 3) {
+      if (cnt++ >= 30) {
         fail("don't expect leader to be on the jetty we stopped deadJetty=" + deadJetty.getNodeName() + " leaderJetty=" + leaderJetty.getNodeName());
       }
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
index 0037f1a..bad1713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
@@ -130,6 +130,8 @@ public class TestCloudDeleteByQuery extends SolrCloudTestCase {
         .setProperties(collectionProperties)
         .process(cluster.getSolrClient());
 
+    cluster.waitForActiveCollection(COLLECTION_NAME, NUM_SHARDS, NUM_SHARDS * REPLICATION_FACTOR);
+
     CLOUD_CLIENT = cluster.getSolrClient();
     CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
     
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
index 5b7e436..9f04280 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -37,11 +38,13 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.cloud.ClusterStateUtil;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.metrics.SolrMetricManager;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -62,6 +65,9 @@ public class TestCloudRecovery extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
+
+    System.setProperty("solr.getleader.looptimeout", "10000");
+    System.setProperty("solr.recovery.maxretries", "5");
     System.setProperty("solr.enableMetrics", "true");
     System.setProperty("solr.disableDefaultJmxReporter", "false");
     System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
@@ -73,7 +79,7 @@ public class TestCloudRecovery extends SolrCloudTestCase {
   public void setUp() throws Exception {
     super.setUp();
     useFactory(null);
-    configureCluster(2)
+    configureCluster(4)
         .addConfig("config", SolrTestUtil.TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
 
@@ -137,6 +143,14 @@ public class TestCloudRecovery extends SolrCloudTestCase {
 
     cluster.waitForActiveCollection(COLLECTION, 2, 2 * (nrtReplicas + tlogReplicas));
 
+    TimeOut timeout = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    while (!timeout.hasTimedOut()) {
+      resp = cloudClient.query(COLLECTION, params);
+      if (resp.getResults().getNumFound() >= 4) {
+        break;
+      }
+    }
+
     resp = cloudClient.query(COLLECTION, params);
     assertEquals(4, resp.getResults().getNumFound());
     // Make sure all leader nodes recover from tlog
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index a15fd89..2e6678e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -71,6 +71,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       node2.stop();
 
       cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
+      cluster.waitForActiveCollection(COLLECTION, 1, 1, true);
 
       UpdateRequest req = new UpdateRequest();
       for (int i = 0; i < 100; i++) {
@@ -154,7 +155,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
       assertEquals("30", v.toString());
     }
-    Replica oldLeader = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION).getLeader("s1");
+    Replica oldLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION,"s1");
 
 
     node1.stop();
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index e42bada..deed435 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -122,6 +122,7 @@ public class TestPullReplica extends SolrCloudTestCase {
         CollectionAdminRequest
             .createCollection(collectionName, "conf", 2, 1, 0, 3).waitForFinalState(true)
             .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(collectionName, 2, 8);
         break;
       case 1:
         // Sometimes use v1 API
@@ -132,6 +133,7 @@ public class TestPullReplica extends SolrCloudTestCase {
             3);    // pullReplicas);
         url = url + SolrTestCaseUtil.pickRandom("", "&nrtReplicas=1", "&replicationFactor=1"); // These options should all mean the same
         Http2SolrClient.GET(url, cluster.getSolrClient().getHttpClient());
+        cluster.waitForActiveCollection(collectionName, 2, 8);
         break;
       case 2:
         // Sometimes use V2 API
@@ -146,6 +148,7 @@ public class TestPullReplica extends SolrCloudTestCase {
             .POST(url, cluster.getSolrClient().getHttpClient(),
                 requestBody.getBytes("UTF-8"), "application/json");
         assertEquals(200, response.status);
+        cluster.waitForActiveCollection(collectionName, 2, 8);
         break;
     }
     boolean reloaded = false;
@@ -213,7 +216,7 @@ public class TestPullReplica extends SolrCloudTestCase {
     int numPullReplicas = 1 + random().nextInt(3);
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numPullReplicas).waitForFinalState(true)
     .process(cluster.getSolrClient());
-    waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", collectionName, clusterShape(1, numPullReplicas + 1));
+    cluster.waitForActiveCollection(collectionName, 1, numPullReplicas + 1);
     DocCollection docCollection = assertNumberOfReplicas(1, 0, numPullReplicas, false, true);
     assertEquals(1, docCollection.getSlices().size());
 
@@ -524,7 +527,7 @@ public class TestPullReplica extends SolrCloudTestCase {
   }
 
   private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query) throws IOException, SolrServerException, InterruptedException {
-    TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    TimeOut t = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     for (Replica r : replicas) {
       if (cluster.getSolrClient().getZkStateReader().isNodeLive(r.getNodeName())) {
         try (Http2SolrClient replicaClient = SolrTestCaseJ4.getHttpSolrClient(r.getCoreUrl())) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
index e1a673b..6ac8c58 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
@@ -83,7 +83,7 @@ public class TestSegmentSorting extends SolrCloudTestCase {
               .setProperties(collectionProperties);
 
     assertTrue( cmd.process(cloudSolrClient).isSuccess() );
-
+    cluster.waitForActiveCollection(collectionName, NUM_SHARDS, NUM_SHARDS * (TEST_NIGHTLY ? REPLICATION_FACTOR : 1));
     
     cloudSolrClient.setDefaultCollection(collectionName);
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index 438b9a3..2a65bb4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -56,7 +56,7 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
       (1, SolrTestUtil.createTempDir(), buildJettyConfig("/solr"));
     try {
       log.info("Create our collection");
-      CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+      CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).waitForFinalState(true).process(cluster.getSolrClient());
                                            
       log.info("Shutdown 1 node");
       final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
index 8344c8d..0f679fd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
@@ -52,6 +52,7 @@ public class CollectionReloadTest extends SolrCloudTestCase {
 
     final String testCollectionName = "c8n_1x1";
     CollectionAdminRequest.createCollection(testCollectionName, "conf", 1, 1)
+        .waitForFinalState(true)
         .process(cluster.getSolrClient());
 
     CollectionAdminRequest.reloadCollection(testCollectionName).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index bfd55b3..dbe3b12 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.cloud.StoppableIndexingThread;
 import org.apache.solr.common.ParWork;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 
 @Slow
 @LuceneTestCase.Nightly
+@Ignore // tmp using too large for test ram
 public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -56,7 +58,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
 
   @Test
   public void start() throws Exception {
-    int collectionCnt = 5;
+    int collectionCnt = 80;
     List<Future> futures = new ArrayList<>();
     List<Future> indexFutures = new ArrayList<>();
     for (int i = 0; i < collectionCnt; i ++) {
@@ -96,18 +98,22 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
       cluster.waitForActiveCollection(collectionName, 4, 16);
     }
 
-
+    List<JettySolrRunner> stoppedRunners = new ArrayList<>();
     for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
       log.info("Stopping {}", runner);
+      if (random().nextBoolean()) {
+        continue;
+      }
       runner.stop();
+      stoppedRunners.add(runner);
     }
 
-    for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
+    for (JettySolrRunner runner : stoppedRunners) {
       log.info("Starting {}", runner);
       runner.start();
     }
 
-
+    Thread.sleep(5000);
     for (int r = 0; r < 2; r++) {
       for (int i = 0; i < collectionCnt; i++) {
         final String collectionName = "testCollection" + i;
diff --git a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
index d3201f0..954dd35 100644
--- a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
@@ -75,7 +75,7 @@ public class CachingDirectoryFactoryTest extends SolrTestCaseJ4 {
       incRefThread.start();
     }
 
-    Thread.sleep(TEST_NIGHTLY ? 2000 : 50);
+    Thread.sleep(TEST_NIGHTLY ? LuceneTestCase.atLeast(500) : 50);
 
     Thread closeThread = new Thread() {
       public void run() {
diff --git a/solr/core/src/test/org/apache/solr/core/TestConfigSets.java b/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
index 70af6a0..1584883 100644
--- a/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
+++ b/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
@@ -136,10 +136,10 @@ public class TestConfigSets extends SolrTestCaseJ4 {
       // Now copy in a config with a /dump handler and reload
       FileUtils.copyFile(SolrTestUtil.getFile("solr/collection1/conf/solrconfig-withgethandler.xml"), new File(new File(configSetsDir, "configset-2/conf"), "solrconfig.xml"));
       container.reload("core1");
-      core.close();
+
       core = container.getCore("core1");
       assertThat("A /dump handler should be defined in the reloaded configuration", core.getRequestHandler("/dump"), is(notNullValue()));
-      core.close();
+
     } finally {
       container.shutdown();
     }
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index f8f5284..400699f 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -196,6 +196,8 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       CollectionAdminRequest.createCollection(collectionName, configName, 2, 2).waitForFinalState(true)
           .process(miniCluster.getSolrClient());
 
+      miniCluster.waitForActiveCollection(collectionName, 2, 4);
+
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(ShardParams.SHARDS_TOLERANT, "requireZkConnected");
       QueryRequest req = new QueryRequest(params);
diff --git a/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java b/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
index ef17209..11a07f6 100644
--- a/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
+++ b/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
@@ -69,6 +69,8 @@ abstract public class SolrRestletTestBase extends RestTestBase {
 
   @After
   public void tearDown() throws Exception {
+    if (jetty != null) jetty.stop();
+    jetty = null;
     super.tearDown();
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java b/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
index 56dc635..31a235d 100644
--- a/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
+++ b/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
@@ -34,10 +34,11 @@ public class TestUniqueKeyFieldResource extends SolrRestletTestBase {
 
   @After
   public void tearDown() throws Exception {
-    super.tearDown();
     if (jetty != null) {
       jetty.stop();
     }
+    jetty = null;
+    super.tearDown();
   }
 
   @Test
diff --git a/solr/server/etc/jetty-http.xml b/solr/server/etc/jetty-http.xml
index a38208b..b24cd98 100644
--- a/solr/server/etc/jetty-http.xml
+++ b/solr/server/etc/jetty-http.xml
@@ -34,7 +34,7 @@
             <Item>
               <New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory">
                 <Arg name="config"><Ref refid="httpConfig" /></Arg>
-                <Set name="maxConcurrentStreams">1024</Set>
+                <Set name="maxConcurrentStreams">4096</Set>
                 <Set name="inputBufferSize">8192</Set>
                 <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
                 <Set name="rateControlFactory">
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index c12b4f6..7ac4417 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -54,7 +54,7 @@
             <Item>
               <New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
                 <Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
-                <Set name="maxConcurrentStreams">1024</Set>
+                <Set name="maxConcurrentStreams">4096</Set>
                 <Set name="inputBufferSize">8192</Set>
                 <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
                 <Set name="rateControlFactory">
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
index a4c8326..258d62a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
@@ -186,7 +186,12 @@ public class ShardTerms implements MapWriter {
    */
   public ShardTerms setTermEqualsToLeader(String coreNodeName) {
     long maxTerm = getMaxTerm();
-    if (values.get(coreNodeName) == maxTerm) return null;
+    Long term = values.get(coreNodeName);
+    if (term == null) {
+      registerTerm(coreNodeName);
+      term = 0l;
+    }
+    if (term == maxTerm) return null;
 
     Map<String, Long> newValues = new ConcurrentHashMap<>(values);
     newValues.put(coreNodeName, maxTerm);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 9357419..69f090b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -102,7 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
 
   private volatile String defaultCollection;
   //no of times collection state to be reloaded if stale state error is received
-  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "3"));
+  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "10"));
   private Random rand = new Random();
 
   private final boolean updatesToLeaders;
@@ -112,7 +112,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
   private final ExecutorService threadPool;
   private String idField = ID;
   public static final String STATE_VERSION = "_stateVer_";
-  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
+  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);//3 seconds or 3 million nanos
   private final Set<String> NON_ROUTABLE_PARAMS;
   {
     NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -366,7 +366,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       catch (RuntimeException e) {
         // not ready yet, then...
       }
-      TimeUnit.MILLISECONDS.sleep(250);
+      TimeUnit.MILLISECONDS.sleep(50);
     }
     throw new TimeoutException("Timed out waiting for cluster");
   }
@@ -482,9 +482,9 @@ public abstract class BaseCloudSolrClient extends SolrClient {
 
     DocCollection col = getDocCollection(collection, null);
 
-    if (col == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collection);
-    }
+//    if (col == null) {
+//      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collection);
+//    }
 
     DocRouter router = col.getRouter();
 
@@ -838,7 +838,6 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     }
     List<String> inputCollections =
         collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
-
     return requestWithRetryOnStaleState(request, 0, inputCollections);
   }
 
@@ -878,18 +877,18 @@ public abstract class BaseCloudSolrClient extends SolrClient {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
         }
         int collVer = coll.getZNodeVersion();
-        if (coll.getStateFormat()>1) {
-          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
-          requestedCollections.add(coll);
 
-          if (stateVerParamBuilder == null) {
-            stateVerParamBuilder = new StringBuilder();
-          } else {
-            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
-          }
+        if (requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+        requestedCollections.add(coll);
 
-          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        if (stateVerParamBuilder == null) {
+          stateVerParamBuilder = new StringBuilder();
+        } else {
+          stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
         }
+        Map su = coll.getStateUpdates();
+        stateVerParamBuilder.append(coll.getName()).append(":").append(collVer).append(">").append(su == null ? 0 : su.hashCode());
+
       }
 
       if (stateVerParamBuilder != null) {
@@ -980,7 +979,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       }
 
       boolean stateWasStale = false;
-      if (retryCount < MAX_STALE_RETRIES  &&
+      if (retryCount <= MAX_STALE_RETRIES  &&
           requestedCollections != null    &&
           !requestedCollections.isEmpty() &&
           (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
@@ -997,7 +996,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
 
       // if we experienced a communication error, it's worth checking the state
       // with ZK just to make sure the node we're trying to hit is still part of the collection
-      if (retryCount < MAX_STALE_RETRIES &&
+      if (retryCount <= MAX_STALE_RETRIES &&
           !stateWasStale &&
           requestedCollections != null &&
           !requestedCollections.isEmpty() &&
@@ -1020,7 +1019,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
 
       // if the state was stale, then we retry the request once with new state pulled from Zk
       if (stateWasStale) {
-        log.warn("Re-trying request to collection(s) {} after stale state error from server. retryCound={}", inputCollections, retryCount);
+        log.warn("Re-trying request to collection(s) {} after stale state error from server.", inputCollections);
         resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
       } else {
         if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
@@ -1030,62 +1029,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
         }
       }
     }
-    if (resp != null && resp.get("exception") == null) {
-      waitForClusterStateUpdates(request, resp);
-    }
 
     return resp;
   }
 
-  protected void waitForClusterStateUpdates(SolrRequest request, NamedList<Object> resp) {
-
-    // TODO - better check that this is a collections api call
-    if (request.getParams() == null || request.getParams().get(CoreAdminParams.ACTION) == null) {
-      return;
-    }
-
-    String action = request.getParams().get(CoreAdminParams.ACTION);
-
-    String collection = request.getParams().get("collection");
-    if (collection == null) {
-      collection = request.getParams().get("name");
-    }
-    if (collection != null) {
-      Integer ver = (Integer) resp.get("csver");
-      if (ver != null) {
-        try {
-          log.info("Wait for catch up to server state {}", ver);
-          getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
-            if (collectionState != null && collectionState.getZNodeVersion() >= ver) {
-              return true;
-            }
-            return false;
-          });
-        } catch (TimeoutException | InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for collection version " + ver, e);
-        }
-      } else if (request.getParams().get(CoreAdminParams.ACTION).equals(CollectionParams.CollectionAction.DELETE.toString())) {
-//        try {
-//          getZkStateReader().waitForState(collection, 10, TimeUnit.SECONDS, (c) -> c == null);
-//        } catch (InterruptedException e) {
-//          ParWork.propagateInterrupt(e);
-//          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
-//        } catch (TimeoutException e) {
-//          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-//        }
-      }
-
-    }
-  }
-
   protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
       throws SolrServerException, IOException {
-
-//    if (request.getParams().get(STATE_VERSION) == null) {
-//      throw new IllegalStateException("State version cannot be null " + request.getParams());
-//    }
-
     connect();
 
     boolean sendToLeaders = false;
@@ -1127,7 +1076,9 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       }
 
     } else if (ADMIN_PATHS.contains(request.getPath())) {
-      for (String liveNode : liveNodes) {
+      List<String> liveNodesList = new ArrayList<>(liveNodes);
+      Collections.shuffle(liveNodesList, rand);
+      for (String liveNode : liveNodesList) {
         theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
             getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
       }
@@ -1143,35 +1094,67 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       //   at every shard when getting leaders if we tweaked some things
 
       // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
-      Map<String,Slice> slices = new HashMap<>();
-      String shardKeys = reqParams.get(ShardParams._ROUTE_);
-      for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(collectionName, null);
-        if (col == null) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
-        }
-        List<Slice> routeSlices = new ArrayList<>(col.getRouter().getSearchSlices(shardKeys, reqParams , col));
-        Collections.shuffle(routeSlices);
-        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
-      }
-
-      // Gather URLs, grouped by leader or replica
       List<Replica> sortedReplicas = new ArrayList<>();
       List<Replica> replicas = new ArrayList<>();
-      for (Slice slice : slices.values()) {
-        Replica leader = slice.getLeader();
-        ArrayList<Replica> replicaList = new ArrayList<>(slice.getReplicas());
-        Collections.shuffle(replicaList);
-        for (Replica replica : replicaList) {
-          String node = replica.getNodeName();
-          if (!liveNodes.contains(node) // Must be a live node to continue
-              || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
-            continue;
-          if (sendToLeaders && replica.equals(leader)) {
-            sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
-          } else {
-            replicas.add(replica); // replicas here
+      for (int i = 0; i < 2; i++) {
+        Map<String,Slice> slices = new HashMap<>();
+        String shardKeys = reqParams.get(ShardParams._ROUTE_);
+        for (String collectionName : collectionNames) {
+          DocCollection col = getDocCollection(collectionName, null);
+          if (col == null) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
           }
+          slices.putAll(col.getSlicesMap());
+          List<Slice> routeSlices = new ArrayList<>(col.getRouter().getSearchSlices(shardKeys, reqParams, col));
+          Collections.shuffle(routeSlices);
+          ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+        }
+
+        // Gather URLs, grouped by leader or replica
+
+        for (Slice slice : slices.values()) {
+          Replica leader = slice.getLeader();
+          ArrayList<Replica> replicaList = new ArrayList<>(slice.getReplicas());
+          Collections.shuffle(replicaList);
+          for (Replica replica : replicaList) {
+            String node = replica.getNodeName();
+            if (!liveNodes.contains(node) // Must be a live node to continue
+                || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+              continue;
+            if (sendToLeaders && replica.equals(leader)) {
+              sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
+            } else {
+              replicas.add(replica); // replicas here
+            }
+          }
+        }
+
+        if (sortedReplicas.size() == 0) {
+          if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
+            ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) getClusterStateProvider();
+            getClusterStateProvider().connect();
+            for (String collectionName : collectionNames) {
+              try {
+                provider.zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, (liveNodes1, collectionState) -> {
+                  if (collectionState == null) {
+                    return false;
+                  }
+                  List<Replica> reps = collectionState.getReplicas();
+                  for (Replica rep : reps) {
+                    if (liveNodes1.contains(rep.getNodeName()) && rep.getState() == Replica.State.ACTIVE) {
+                      return true;
+                    }
+                  } return false;
+                });
+              } catch (InterruptedException e) {
+
+              } catch (TimeoutException e) {
+
+              }
+            }
+          }
+        } else {
+          break;
         }
       }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d4e7cdc..1020d22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -314,20 +314,23 @@ public class Http2SolrClient extends SolrClient {
     if (log.isTraceEnabled()) log.trace("Closing {} closeClient={}", this.getClass().getSimpleName(), closeClient);
     // assert closeTracker != null ? closeTracker.close() : true;
     try {
-      asyncTracker.close();
-    } catch (Exception e) {
-      log.error("Exception closing httpClient asyncTracker", e);
-    }
-    closed = true;
-    if (closeClient) {
       try {
-        httpClient.stop();
+        asyncTracker.close();
       } catch (Exception e) {
-        log.error("Exception closing httpClient", e);
+        log.error("Exception closing httpClient asyncTracker", e);
+      }
+      closed = true;
+      if (closeClient) {
+        try {
+          httpClient.stop();
+        } catch (Exception e) {
+          log.error("Exception closing httpClient", e);
+        }
       }
+      if (log.isTraceEnabled()) log.trace("Done closing {}", this.getClass().getSimpleName());
+    } finally {
+      assert ObjectReleaseTracker.release(this);
     }
-    if (log.isTraceEnabled()) log.trace("Done closing {}", this.getClass().getSimpleName());
-    assert ObjectReleaseTracker.release(this);
   }
 
   public void waitForOutstandingRequests() {
@@ -1150,7 +1153,7 @@ public class Http2SolrClient extends SolrClient {
     private SSLConfig sslConfig = defaultSSLConfig;
     private Integer idleTimeout = DEFAULT_IDLE_TIME;
     private Integer connectionTimeout;
-    private Integer maxConnectionsPerHost = 32;
+    private Integer maxConnectionsPerHost = 64;
     private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
     protected String baseSolrUrl;
     protected Map<String,String> headers = new HashMap<>(12);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index b55ce1a..13ad78c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -63,11 +63,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Integer numPullReplicas;
   private final Integer maxShardsPerNode;
   private final Boolean readOnly;
-  private final Map stateUpdates;
+  private volatile Map stateUpdates;
   private final Long id;
 
   private AtomicInteger sliceAssignCnt = new AtomicInteger();
-  private volatile boolean createdLazy;
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
     this(name, slices, props, router, -1, null);
@@ -265,11 +264,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public String toString() {
-    return "DocCollection("+name+":" + ":v=" + znodeVersion + " u=" + stateUpdates + " l=" + createdLazy + ")=" + toJSONString(this);
-  }
-  
-  public void setCreatedLazy() {
-    this.createdLazy = true;
+    return "DocCollection("+name+":" + ":v=" + znodeVersion + " u=" + stateUpdates + ")=" + toJSONString(this);
   }
 
   @Override
@@ -451,6 +446,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return stateUpdates != null;
   }
 
+  public void setStateUpdates(Map stateUpdates) {
+    this.stateUpdates = stateUpdates;
+  }
+
   public void setSliceAssignCnt(int i) {
     sliceAssignCnt.set(i);
   }
@@ -466,4 +465,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   public void setZnodeVersion(int version) {
     this.znodeVersion = version;
   }
+
+  public Map<String, Slice> getSlicesCopy() {
+    return new LinkedHashMap<>(slices);
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index ae9f960..3f58db4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -266,7 +266,6 @@ public class Replica extends ZkNodeProps {
   // only to be used by ZkStateWriter currently
   public void setState(State state) {
     this.state = state;
-    propMap.put(ZkStateReader.STATE_PROP, state.toString());
   }
 
   public boolean isActive(Set<String> liveNodes) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 2a22b9c..1aaa259 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -279,7 +279,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   private Replica findLeader() {
     for (Replica replica : replicas.values()) {
       String leaderStr = replica.getStr(LEADER);
-      if (leaderStr != null && leaderStr.equals("true") && replica.getState() == Replica.State.ACTIVE) {
+      if (leaderStr != null && leaderStr.equals("true")) {
         return replica;
       }
     }
@@ -347,7 +347,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   // only to be used by ZkStateWriter currently
   public void setState(State state) {
     this.state = state;
-    propMap.put(ZkStateReader.STATE_PROP, state.toString());
   }
 
   // only to be used by ZkStateWriter currently
@@ -365,7 +364,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   @Override
   public String toString() {
-    return name + ':' + toJSONString(propMap);
+    return name + "[" + leader + "]"  + ':' + toJSONString(propMap);
   }
 
   @Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 2a48875..454e30c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -490,6 +490,11 @@ public class SolrZkClient implements Closeable {
     }
   }
 
+  public void create(final String path, final byte data[], CreateMode createMode, AsyncCallback.Create2Callback cb) throws KeeperException, InterruptedException {
+    List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+    connManager.getKeeper().create(path, data, acls, createMode, cb, "create", -1);
+  }
+
   public String create(final String path, final byte[] data, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
     return create(path, data, createMode, retryOnConnLoss, false);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 30de809..256859d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.CloudInspectUtil;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.Callable;
@@ -67,8 +68,11 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.TimeOut;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.zookeeper.AddWatchMode;
@@ -197,6 +201,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
 
+  private final ConcurrentHashMap<String, ReentrantLock> collectionStateLocks = new ConcurrentHashMap<>(32, 0.75f, 16);
+
   /**
    * Watchers of Collection properties
    */
@@ -260,7 +266,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     public boolean canBeRemoved() {
       int refCount = coreRefCount.get();
       int watcherCount = stateWatchers.size();
-      log.trace("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
+      log.debug("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
       return refCount <= 0 && watcherCount <= 0;
     }
 
@@ -382,27 +388,38 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     try {
       refreshCollectionList();
       refreshLiveNodes();
+
       // Need a copy so we don't delete from what we're iterating over.
       watchedCollectionStates.forEach((name, coll) -> {
         DocCollection newState = null;
+        ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+        collectionStateLock.lock();
         try {
-          newState = fetchCollectionState(name);
-        } catch (Exception e) {
-          log.error("problem fetching update collection state", e);
-          return;
-        }
+          try {
+            newState = fetchCollectionState(name);
+          } catch (Exception e) {
+            log.error("problem fetching update collection state", e);
+            return;
+          }
+
+          String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+          try {
+            getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
+          } catch (Exception e) {
+            log.error("Error fetching state updates", e);
+          }
 
-        if (updateWatchedCollection(name, newState, false)) {
-          constructState(newState);
+          if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
+            constructState(newState, "forciblyRefreshAllClusterStateSlow");
+          }
+        } finally {
+          collectionStateLock.unlock();
         }
       });
 
-    } catch (KeeperException e) {
+    } catch (Exception e) {
       log.error("", e);
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
-    } catch (InterruptedException e) {
-      ParWork.propagateInterrupt(e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
 
   }
@@ -418,11 +435,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       DocCollection newState = fetchCollectionState(name);
 
-      if (newState == null) {
-        return;
+      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+      try {
+        getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
+      } catch (Exception e) {
+        log.error("Error fetching state updates", e);
       }
 
-      if (updateWatchedCollection(name, newState, false)) {
+      if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
         updatedCollections.add(newState);
       }
 
@@ -442,30 +462,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     refreshLiveNodes();
   }
 
-  public Integer compareStateVersions(String coll, int version) {
+  public Integer compareStateVersions(String coll, int version, int updateHash) {
+    log.debug("compareStateVersions {} {} {}", coll, version, updateHash);
     DocCollection collection = getCollectionOrNull(coll);
     if (collection == null) return null;
-    if (collection.getZNodeVersion() < version) {
+    if (collection.getZNodeVersion() != version || (collection.getZNodeVersion() == version && collection.hasStateUpdates() && updateHash != collection.getStateUpdates().hashCode())) {
       if (log.isDebugEnabled()) {
         log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
       }
       DocCollection nu = getCollectionLive(coll);
+      log.debug("got collection {} {} {}", nu);
       if (nu == null) return -3;
-      if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
-        if (updateWatchedCollection(coll, nu, false)) {
-          constructState(nu);
-          String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
-          try {
-            nu = getAndProcessStateUpdates(coll, stateUpdatesPath, true, nu, null);
-          } catch (Exception e) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, e);
-          }
-        }
-        collection = nu;
-      }
+
+      constructState(nu, "compareStateVersions");
     }
 
-    if (collection.getZNodeVersion() == version) {
+    if (collection.getZNodeVersion() == version && (!collection.hasStateUpdates() || updateHash == collection.getStateUpdates().hashCode())) {
       return null;
     }
 
@@ -545,14 +557,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       // on reconnect of SolrZkClient force refresh and re-add watches.
       loadClusterProperties();
 
-      IOUtils.closeQuietly(liveNodesWatcher);
-
       this.liveNodesWatcher = new LiveNodeWatcher();
 
       this.liveNodesWatcher.createWatch();
       this.liveNodesWatcher.refresh();
 
-      IOUtils.closeQuietly(collectionsChildWatcher);
       this.collectionsChildWatcher = new CollectionsChildWatcher();
 
       this.collectionsChildWatcher.createWatch();
@@ -622,10 +631,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         });
   }
 
-  private void constructState(DocCollection collection) {
-    constructState(collection, "general");
-  }
-
   /**
    * Construct the total state view from all sources.
    *
@@ -634,27 +639,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   private void constructState(DocCollection collection, String caller) {
 
-    if (log.isDebugEnabled()) log.trace("construct new cluster state on structure change {} {}", caller, collection);
-
-
+    log.trace("construct new cluster state on structure change {} {}", caller, collection);
 
     log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
           clusterState.keySet());
 
-//
-//    watchedCollectionStates.forEach((s, slices) -> {
-//      clusterState.putIfAbsent(s, new ClusterState.CollectionRef(slices));
-//      lazyCollectionStates.remove(s);
-//    });
-//
-//    // Finally, add any lazy collections that aren't already accounted for.
-//    lazyCollectionStates.forEach((s, lazyCollectionRef) -> {
-//      clusterState.putIfAbsent(s, lazyCollectionRef);
-//    });
-    if (collection != null) {
-      this.clusterState.put(collection.getName(), new ClusterState.CollectionRef(collection));
-    }
-
     notifyCloudCollectionsListeners(true);
 
     if (collection != null) {
@@ -680,6 +669,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     log.debug("found collections {}", children);
     // First, drop any children that disappeared.
     this.lazyCollectionStates.keySet().retainAll(children);
+    log.debug("lazyCollectionState retained collections {}", children);
     for (String coll : children) {
       // We will create an eager collection for any interesting collections, so don't add to lazy.
       if (!collectionWatches.containsKey(coll)) {
@@ -690,6 +680,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
           if (old == null) {
             clusterState.put(coll, docRef);
+            ReentrantLock collectionStateLock = new ReentrantLock(true);
+            ReentrantLock oldLock = collectionStateLocks.putIfAbsent(coll, collectionStateLock);
 
             log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
                 watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
@@ -698,30 +690,30 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
     }
 
-//    List<String> finalChildren = children;
-//    watchedCollectionStates.keySet().forEach(col -> {
-//      if (!finalChildren.contains(col)) {
-//        log.debug("remove watched collection state due to live node {}", col);
-//        watchedCollectionStates.remove(col);
-//        CollectionStateWatcher sw = stateWatchersMap.remove(col);
-//        if (sw != null) sw.removeWatch();
-//        IOUtils.closeQuietly(sw);
-//        if (collectionRemoved != null) {
-//          collectionRemoved.removed(col);
-//        }
-//        if (sw != null) {
-//          ReentrantLock lock = sw.collectionStateLock;
-//          if (lock != null) {
-//            lock.lock();
-//            try {
-//              clusterState.getCollectionStates().remove(col);
-//            } finally {
-//              lock.unlock();
-//            }
-//          }
-//        }
-//      }
-//    });
+    List<String> finalChildren = children;
+    watchedCollectionStates.keySet().forEach(col -> {
+      if (!finalChildren.contains(col)) {
+        log.debug("remove watched collection state due to removed collections node {}", col);
+        watchedCollectionStates.remove(col);
+        CollectionStateWatcher sw = stateWatchersMap.remove(col);
+        if (sw != null) sw.removeWatch();
+        IOUtils.closeQuietly(sw);
+        if (collectionRemoved != null) {
+          collectionRemoved.removed(col);
+        }
+        if (sw != null) {
+          ReentrantLock collectionStateLock = collectionStateLocks.get(col);
+          if (collectionStateLock != null) {
+            collectionStateLock.lock();
+            try {
+              clusterState.remove(col);
+            } finally {
+              collectionStateLock.unlock();
+            }
+          }
+        }
+      }
+    });
 
   }
 
@@ -760,14 +752,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     boolean fire = true;
 
     newCollections = getCurrentCollections();
-    oldCollections = lastFetchedCollectionSet.getAndSet(newCollections);
-    if (!newCollections.equals(oldCollections) || notifyIfSame) {
-      fire = true;
-    }
+//    oldCollections = lastFetchedCollectionSet.getAndSet(newCollections);
+//    if (!newCollections.equals(oldCollections) || notifyIfSame) {
+//      fire = true;
+//    }
 
     log.trace("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
     if (fire) {
-      cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(oldCollections, newCollections));
+      cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(newCollections, newCollections));
     }
   }
 
@@ -810,7 +802,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           try {
             DocCollection cdc = getCollectionLive(collName);
             if (cdc != null) {
-              cdc.setCreatedLazy();
               lastUpdateTime = System.nanoTime();
               cachedDocCollection = cdc;
               return cdc;
@@ -960,15 +951,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public Replica getLeader(String collection, String shard) {
-    return getLeader(getCollectionOrNull(collection), shard);
-  }
-
-  private Replica getLeader(DocCollection docCollection, String shard) {
-    Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
-    if (replica != null && replica.getState() == Replica.State.ACTIVE) {
-      return replica;
+    try {
+      return getLeaderRetry(collection, shard, 5000);
+    } catch (InterruptedException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    } catch (TimeoutException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
-    return null;
   }
 
 //  public Replica getLeader(String collection, String shard) {
@@ -994,75 +983,79 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard) throws InterruptedException, TimeoutException {
-    return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
+    return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT, false);
   }
 
-  /**
-   * Get shard leader properties, with retry if none exist.
-   */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
-    return getLeaderRetry(collection, shard, timeout, true);
+    return getLeaderRetry(collection, shard, timeout, false);
   }
 
   /**
    * Get shard leader properties, with retry if none exist.
    */
-  public Replica getLeaderRetry(String collection, String shard, int timeout, boolean mustBeLive) throws InterruptedException, TimeoutException {
-    DocCollection coll = getCollectionOrNull(collection);
-    if (coll != null) {
-      Slice slice = coll.getSlice(shard);
-      if (slice  != null) {
-        Replica leader = slice.getLeader();
-        if (leader != null && isNodeLive(leader.getNodeName())) {
-          return leader;
-        }
-      }
-    }
+  public Replica getLeaderRetry(String collection, String shard, int timeout, boolean checkValidLeader) throws InterruptedException, TimeoutException {
     AtomicReference<Replica> returnLeader = new AtomicReference<>();
-    try {
-      waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
-        if (c == null)
-          return false;
-        Slice slice = c.getSlice(shard);
-        if (slice == null) return false;
-        Replica zkLeader = null;
-        Replica leader = slice.getLeader();
-        if (leader != null && leader.getState() == Replica.State.ACTIVE) {
-          if (isNodeLive(leader.getNodeName())) {
+    DocCollection coll;
+    int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+    TimeOut leaderVerifyTimeout = new TimeOut(timeout, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    while (true) {
+
+      try {
+        waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
+          if (c == null) return false;
+          Slice slice = c.getSlice(shard);
+          if (slice == null) return false;
+          Replica leader = slice.getLeader();
+          if (leader != null) {
+            if (leader.getState() != Replica.State.ACTIVE) {
+              return false;
+            }
+
             returnLeader.set(leader);
             return true;
           }
-        }
-        // MRM TODO: remove
-        Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
-            if (isNodeLive(replica.getNodeName())) {
-              returnLeader.set(replica);
-              return true;
-            }
+
+          return false;
+        });
+      } catch (TimeoutException e) {
+        coll = getCollectionOrNull(collection);
+        throw new TimeoutException(
+            "No registered leader was found after waiting for " + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
+                + " with live_nodes=" + liveNodes);
+      }
+
+      Replica leader = returnLeader.get();
+      if (checkValidLeader) {
+        try (Http2SolrClient client = new Http2SolrClient.Builder("").idleTimeout(readTimeout).markInternalRequest().build()) {
+          CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+          prepCmd.setCoreName(leader.getName());
+          prepCmd.setLeaderName(leader.getName());
+          prepCmd.setCollection(leader.getCollection());
+          prepCmd.setShardId(leader.getSlice());
+
+          prepCmd.setBasePath(leader.getBaseUrl());
+
+          try {
+            NamedList<Object> result = client.request(prepCmd);
+            break;
+          } catch (Exception e) {
+            log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
           }
         }
+        if (leaderVerifyTimeout.hasTimedOut()) {
+          throw new SolrException(ErrorCode.SERVER_ERROR,
+              "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
+        }
 
-        return false;
-      });
-    } catch (TimeoutException e) {
-      coll = getCollectionOrNull(collection);
-      throw new TimeoutException("No registered leader was found after waiting for "
-          + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
-          + " with live_nodes=" + liveNodes + " zkLeaderNode=" + (coll == null ? "null collection" : getLeaderProps(collection, coll.getId(), shard)));
+      } else {
+        break;
+      }
     }
-
-    Replica leader = returnLeader.get();
-
-    if (leader == null) {
-      coll = getCollectionOrNull(collection);
-      throw new SolrException(ErrorCode.SERVER_ERROR, "No registered leader was found "
-          + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
-          + " with live_nodes=" + liveNodes + " zkLeaderNode=" + getLeaderProps(collection, coll.getId(), shard));
+    if (returnLeader.get() == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
     }
-
-    return leader;
+    return returnLeader.get();
   }
 
   private Replica getLeaderProps(final String collection, long collId, final String slice) {
@@ -1384,8 +1377,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   class CollectionStateWatcher implements Watcher, Closeable {
     private final String coll;
     private volatile StateUpdateWatcher stateUpdateWatcher;
-
-    private final ReentrantLock collectionStateLock = new ReentrantLock();
+    
     private volatile boolean closed;
 
     CollectionStateWatcher(String coll) {
@@ -1422,13 +1414,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
      * with the results of the refresh.
      */
     public void refresh() {
+      ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
       collectionStateLock.lock();
       try {
         DocCollection newState = fetchCollectionState(coll);
-
-        if (!updateWatchedCollection(coll, newState, false)) {
+        String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+        newState = getAndProcessStateUpdates(coll, stateUpdatesPath, newState, true);
+        if (!updateWatchedCollection(coll, newState == null ? null : new ClusterState.CollectionRef(newState))) {
           return;
         }
+
         constructState(newState, "state.json watcher");
       } catch (Exception e) {
         log.error("A ZK error has occurred refreshing CollectionStateWatcher for collection={}", coll, e);
@@ -1458,7 +1453,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       String collectionCSNPath = getCollectionSCNPath(coll);
       try {
         zkClient.removeWatches(collectionCSNPath, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -1466,7 +1461,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       try {
         zkClient.removeWatches(stateUpdateWatcher.stateUpdatesPath, stateUpdateWatcher, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -1475,12 +1470,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     public void refreshStateUpdates() {
       if (log.isDebugEnabled()) log.debug("fetch additional state updates {}", coll);
-
+      ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
       try {
         collectionStateLock.lock();
-        getAndProcessStateUpdates(coll, stateUpdateWatcher.stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
+        getAndProcessStateUpdates(coll, stateUpdateWatcher.stateUpdatesPath, getCollectionOrNull(coll), false);
       } catch (Exception e) {
         log.error("Unwatched collection: [{}]", coll, e);
+      } finally {
+        collectionStateLock.unlock();
       }
     }
 
@@ -1506,14 +1503,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       public void process(WatchedEvent event) {
         if (zkClient.isClosed() || closed) return;
         log.trace("_statupdates event {}", event);
-
+        ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+        collectionStateLock.lock();
         try {
-          collectionStateLock.lock();
-          getAndProcessStateUpdates(coll, stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
+          getAndProcessStateUpdates(coll, stateUpdatesPath, getCollectionOrNull(coll), false);
         } catch (AlreadyClosedException e) {
 
         } catch (Exception e) {
           log.error("Unwatched collection: [{}]", coll, e);
+        } finally {
+          collectionStateLock.unlock();
         }
       }
     }
@@ -1570,7 +1569,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       try {
         zkClient.removeWatches(znodePath, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
@@ -1627,7 +1626,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   // MRM TODO: persistent watch
   class CollectionsChildWatcher implements Watcher, Closeable {
-    volatile boolean watchRemoved = true;
+
     @Override
     public void process(WatchedEvent event) {
       if (ZkStateReader.this.closed) {
@@ -1667,7 +1666,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
 
     public void createWatch() {
-      watchRemoved = false;
       try {
         zkClient.addWatch(COLLECTIONS_ZKNODE, this, AddWatchMode.PERSISTENT);
       } catch (Exception e) {
@@ -1676,11 +1674,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
 
     public void removeWatch() {
-      if (watchRemoved) return;
-      watchRemoved = true;
       try {
         zkClient.removeWatches(COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.warn("Exception removing watch", e);
@@ -1697,7 +1693,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * Watches the live_nodes and syncs changes.
    */
   class LiveNodeWatcher implements Watcher, Closeable {
-    volatile boolean watchRemoved = true;
+
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher
@@ -1725,7 +1721,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
 
     public void createWatch() {
-      watchRemoved = false;
       try {
         zkClient.addWatch(LIVE_NODES_ZKNODE, this, AddWatchMode.PERSISTENT);
       } catch (Exception e) {
@@ -1735,13 +1730,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
 
     public void removeWatch() {
-      if (watchRemoved) {
-        return;
-      }
-      watchRemoved = true;
       try {
         zkClient.removeWatches(LIVE_NODES_ZKNODE, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.warn("Exception removing watch", e);
@@ -1757,37 +1748,34 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   public DocCollection getCollectionLive(String coll) {
     log.debug("getCollectionLive {}", coll);
     DocCollection newState;
+
+    ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+    collectionStateLock.lock();
     try {
       newState = fetchCollectionState(coll);
-      if (newState != null) {
-        String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
-        try {
-          newState = getAndProcessStateUpdates(coll, stateUpdatesPath, true, newState, null);
-        } catch (Exception e) {
-          log.error("", e);
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        }
-      }
+      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+      newState = getAndProcessStateUpdates(coll, stateUpdatesPath, newState, true);
+     // constructState(newState, "getCollectionLive");
     } catch (KeeperException e) {
       log.warn("Zookeeper error getting latest collection state for collection={}", coll, e);
       return null;
-    } catch (InterruptedException e) {
-      log.error("Unwatched collection: [{}]", coll, e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted", e);
+    } catch (Exception e) {
+      log.error("Exception getting fetching collection state: [{}]", coll, e);
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Exception getting fetching collection state: " + coll, e);
+    } finally {
+      collectionStateLock.unlock();
     }
 
     return newState;
-
   }
 
-  private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, boolean live, DocCollection docCollection, ReentrantLock collectionStateLock) throws KeeperException, InterruptedException {
-    DocCollection result = null;
+  private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, DocCollection docCollection, boolean live) throws KeeperException, InterruptedException {
     try {
       log.trace("get and process state updates for {}", coll);
 
       Stat stat;
       try {
-        stat = getZkClient().exists(stateUpdatesPath, null,true);
+        stat = getZkClient().exists(stateUpdatesPath, null, true, true);
         if (stat == null) {
           return docCollection;
         }
@@ -1798,7 +1786,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       if (docCollection != null && docCollection.hasStateUpdates()) {
         int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
-        if (stat.getVersion() <= oldVersion) {
+        if (stat.getVersion() < oldVersion) {
           if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
           return docCollection;
         }
@@ -1815,132 +1803,124 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       if (data == null) {
         log.info("No data found for {}", stateUpdatesPath);
+      //  docCollection.getStateUpdates().clear();
         return docCollection;
       }
 
       Map<String,Object> m = (Map) fromJSON(data);
-      if (log.isDebugEnabled()) log.debug("Got additional state updates {}", m);
+      log.trace("Got additional state updates {}", m);
       if (m.size() == 0) {
         return docCollection;
       }
 
       Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
-      if (log.isDebugEnabled()) log.debug("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
+      log.trace("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
 
-      m.remove("_cs_ver_");
+      //m.remove("_cs_ver_");
       m.put("_ver_", stat.getVersion());
-      try {
-        Set<Entry<String,Object>> entrySet = m.entrySet();
 
-        if (docCollection != null) {
-          if (version < docCollection.getZNodeVersion()) {
-            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+      Set<Entry<String,Object>> entrySet = m.entrySet();
+
+      if (docCollection != null) {
+        if (version < docCollection.getZNodeVersion()) {
+          if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+          return docCollection;
+        }
+
+        if (docCollection.hasStateUpdates()) {
+          int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
+          if (stat.getVersion() < oldVersion) {
+            if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
             return docCollection;
           }
+        }
 
-          if (docCollection.hasStateUpdates()) {
-            int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
-            if (stat.getVersion() < oldVersion) {
-              if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
-              return docCollection;
-            }
+
+        for (Entry<String,Object> entry : entrySet) {
+          String id = entry.getKey();
+          if (id.equals("_ver_") || id.equals("_cs_ver_")) continue;
+          Replica.State state = null;
+          if (!entry.getValue().equals("l")) {
+            state = Replica.State.shortStateToState((String) entry.getValue());
           }
 
-          for (Entry<String,Object> entry : entrySet) {
-            String id = entry.getKey();
-            if (id.equals("_ver_")) continue;
-            Replica.State state = null;
-            if (!entry.getValue().equals("l")) {
-              state = Replica.State.shortStateToState((String) entry.getValue());
-            }
+          Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
+          log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
+
+          if (replica != null) {
+
+            //     if (replica.getState() != state || entry.getValue().equals("l")) {
+            Slice slice = docCollection.getSlice(replica.getSlice());
+            Map<String,Replica> replicasMap = slice.getReplicasCopy();
+            Map properties = new HashMap(replica.getProperties());
 
-            Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
-            if (log.isTraceEnabled()) log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
-
-            if (replica != null) {
-
-              //     if (replica.getState() != state || entry.getValue().equals("l")) {
-              Slice slice = docCollection.getSlice(replica.getSlice());
-              Map<String,Replica> replicasMap = new HashMap(slice.getReplicasMap());
-              Map properties = new HashMap(replica.getProperties());
-              if (entry.getValue().equals("l")) {
-                if (log.isDebugEnabled()) log.debug("state is leader, set to active and leader prop");
-                properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
-                properties.put("leader", "true");
-
-                for (Replica r : replicasMap.values()) {
-                  if (replica.getName().equals(r.getName())) {
-                    continue;
-                  }
-                  log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
-                  if ("true".equals(r.getProperties().get(LEADER_PROP))) {
-                    log.debug("remove leader prop {}", r);
-                    Map<String,Object> props = new HashMap<>(r.getProperties());
-                    props.remove(LEADER_PROP);
-                    Replica newReplica = new Replica(r.getName(), props, coll, docCollection.getId(), r.getSlice(), ZkStateReader.this);
-                    replicasMap.put(r.getName(), newReplica);
-                  }
+            if (entry.getValue().equals("l")) {
+              log.trace("state is leader, set to active and leader prop id={}", replica.getId());
+              properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE);
+              properties.put(LEADER_PROP, "true");
+
+              for (Replica r : replicasMap.values()) {
+                if (replica.getName().equals(r.getName())) {
+                  continue;
                 }
-              } else if (state != null && !properties.get(ZkStateReader.STATE_PROP).equals(state.toString())) {
-                log.trace("std state, set to {}", state);
-                properties.put(ZkStateReader.STATE_PROP, state.toString());
-                if ("true".equals(properties.get(LEADER_PROP))) {
-                  properties.remove(LEADER_PROP);
+                log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
+                if ("true".equals(r.getProperties().get(LEADER_PROP))) {
+                  log.debug("remove leader prop {}", r);
+                  Map<String,Object> props = new HashMap<>(r.getProperties());
+                  props.remove(LEADER_PROP);
+                  Replica newReplica = new Replica(r.getName(), props, coll, docCollection.getId(), r.getSlice(), ZkStateReader.this);
+                  replicasMap.put(r.getName(), newReplica);
                 }
               }
+            } else if (state != null) {
+              log.trace("std state, set to {}", state);
+              properties.put(ZkStateReader.STATE_PROP, state.toString());
+              if ("true".equals(properties.get(LEADER_PROP))) {
+                properties.remove(LEADER_PROP);
+              }
+            }
 
-              Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
-
-              log.trace("add new replica {}", newReplica);
+            Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
 
-              replicasMap.put(replica.getName(), newReplica);
+            log.trace("add new replica {}", newReplica);
 
-              Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), coll, replica.id, ZkStateReader.this);
-              //              if (newReplica.getProperty("leader") != null) {
-              //                newSlice.setLeader(newReplica);
-              //              }
+            replicasMap.put(replica.getName(), newReplica);
 
-              Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
-              newSlices.put(slice.getName(), newSlice);
+            Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), coll, docCollection.getId(), ZkStateReader.this);
 
-              log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+            Map<String,Slice> newSlices = docCollection.getSlicesCopy();
+            newSlices.put(slice.getName(), newSlice);
 
-              DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), m);
-              docCollection = newDocCollection;
+            log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
 
-              result = newDocCollection;
+            DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), version, m);
+            docCollection = newDocCollection;
 
-              //  }
-            } else {
-              if (log.isDebugEnabled()) log.debug("Could not find core to update local state {} {}", id, state);
-            }
-          }
-          if (result == null) return docCollection;
-          if (!live) {
-            watchedCollectionStates.put(result.getName(), result);
-
-            // Finally, add any lazy collections that aren't already accounted for.
-            //            lazyCollectionStates.forEach((s, lazyCollectionRef) -> {
-            //              if (!s.equals(coll)) {
-            //                result.putIfAbsent(s, lazyCollectionRef);
-            //              }
-            //
-            //            });
-
-            log.trace("Set a new clusterstate based on update diff {}", result);
-
-            updateWatchedCollection(coll, result, false);
-            constructState(result);
+          } else {
+            if (log.isDebugEnabled()) log.debug("Could not find core to update local state {} {}", id, state);
           }
         }
 
-      } catch (Exception e) {
-        log.error("exeption trying to process additional updates", e);
+
+        log.trace("Set a new clusterstate based on update diff {} live={}", docCollection, live);
+
+        if (live) {
+          return docCollection;
+        }
+
+        if (!updateWatchedCollection(coll, docCollection == null ? null : new ClusterState.CollectionRef(docCollection))) {
+          return docCollection;
+        }
+
+        constructState(docCollection, "state.json state updates watcher");
+
       }
-      return result == null ? docCollection : result;
-    } finally {
-      if (collectionStateLock != null) collectionStateLock.unlock();
+
+    } catch (Exception e) {
+      log.error("exeption trying to process additional updates", e);
     }
+    return docCollection == null ? docCollection : docCollection;
+
   }
 
   private DocCollection fetchCollectionState(String coll) throws KeeperException, InterruptedException {
@@ -1950,46 +1930,51 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     int version = 0;
 
-    Stat stateStat = zkClient.exists(collectionPath, null, true, false);
-    if (stateStat != null) {
-      version = stateStat.getVersion();
-      if (log.isDebugEnabled()) log.debug("version for cs is {}", version);
-      // version we would get
-      DocCollection docCollection = watchedCollectionStates.get(coll);
-      if (docCollection != null) {
-        int localVersion = docCollection.getZNodeVersion();
-        if (log.isDebugEnabled())
-          log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
-        if (docCollection.hasStateUpdates()) {
-          if (localVersion == version) {
-            return docCollection;
-          }
-        } else {
-          if (localVersion == version) {
-            return docCollection;
+
+    // version we would get
+    DocCollection docCollection = null;
+
+    ClusterState.CollectionRef collRef = clusterState.get(coll);
+    if (collRef != null && !collRef.isLazilyLoaded()) {
+      docCollection = collRef.get();
+    }
+
+    if (collRef != null) {
+      Stat stateStat = zkClient.exists(collectionPath, null, true, false);
+      if (stateStat != null) {
+        version = stateStat.getVersion();
+
+        if (log.isDebugEnabled()) log.debug("version for cs is {}, local version is {}", version, docCollection == null ? null : docCollection.getZNodeVersion());
+        if (docCollection != null) {
+          int localVersion = docCollection.getZNodeVersion();
+          if (log.isDebugEnabled()) log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
+          if (docCollection.hasStateUpdates()) {
+            if (localVersion > version) {
+              return docCollection;
+            }
           }
         }
-      }
 
-//      if (lazyCollectionStates.containsKey(coll)) {
-//        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
-//        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
-//        if (cachedCollection != null) {
-//          int localVersion = cachedCollection.getZNodeVersion();
-//          if (cachedCollection.hasStateUpdates()) {
-//            if (localVersion == version) {
-//              return cachedCollection;
-//            }
-//          } else {
-//            if (localVersion == version) {
-//              return cachedCollection;
-//            }
-//          }
-//        }
-//      }
+        //      if (lazyCollectionStates.containsKey(coll)) {
+        //        LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+        //        DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+        //        if (cachedCollection != null) {
+        //          int localVersion = cachedCollection.getZNodeVersion();
+        //          if (cachedCollection.hasStateUpdates()) {
+        //            if (localVersion == version) {
+        //              return cachedCollection;
+        //            }
+        //          } else {
+        //            if (localVersion == version) {
+        //              return cachedCollection;
+        //            }
+        //          }
+        //        }
+        //      }
 
-    } else {
-      return null;
+      } else {
+        return null;
+      }
     }
     log.debug("getting latest state.json");
     Stat stat = new Stat();
@@ -2005,7 +1990,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       log.debug("no data found at state.json node");
       return null;
     }
-    DocCollection docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
+    docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
 
     return docCollection;
   }
@@ -2039,7 +2024,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    */
   public void registerCore(String collection, String coreName) {
 
-    if (log.isDebugEnabled()) log.debug("register core for collection {}", collection);
+    if (log.isDebugEnabled()) log.debug("register core for collection {} {}", collection, coreName);
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
     }
@@ -2048,21 +2033,23 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       return;
     }
 
+    AtomicReference<CollectionStateWatcher> createSw = new AtomicReference();
+
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
-        stateWatchersMap.put(collection, sw);
         sw.createWatch();
-        sw.refresh();
-        sw.refreshStateUpdates();
-        v.coreRefCount.incrementAndGet();
-        return v;
+        stateWatchersMap.put(collection, sw);
+        createSw.set(sw);
       }
       v.coreRefCount.incrementAndGet();
       return v;
     });
-
+    CollectionStateWatcher sw = createSw.get();
+    if (sw != null) {
+      sw.refresh();
+    }
   }
 
   public boolean watched(String collection) {
@@ -2085,29 +2072,35 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       throw new IllegalArgumentException("Collection cannot be null");
     }
 
-//    if (registeredCores.remove(coreName)) {
-//      return;
-//    }
+    if (registeredCores.remove(coreName)) {
+      return;
+    }
 
-    AtomicBoolean reconstructState = new AtomicBoolean(false);
+    AtomicReference<CollectionStateWatcher> reconstructState = new AtomicReference();
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) return null;
       if (v.coreRefCount.get() > 0)
         v.coreRefCount.decrementAndGet();
       if (v.canBeRemoved()) {
+        log.debug("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
         clusterState.put(collection, docRef);
-
-        reconstructState.set(true);
+        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+        if (stateWatcher != null) {
+          reconstructState.set(stateWatcher);
+        }
         return null;
       }
       return v;
     });
-    if (reconstructState.get()) {
-      constructState(null);
+
+    CollectionStateWatcher sw = reconstructState.get();
+    if (sw != null) {
+      sw.removeWatch();
+      IOUtils.closeQuietly(sw);
     }
   }
 
@@ -2157,34 +2150,26 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
     }
-
+    AtomicReference<CollectionStateWatcher> watchSet = new AtomicReference<>();
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
-        log.debug("creating CollectionStateWatcher and refreshing for {}", collection);
+        log.debug("creating CollectionStateWatcher for {} and refreshing", collection);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
-        stateWatchersMap.put(collection, sw);
-
         sw.createWatch();
-        sw.refresh();
-        sw.refreshStateUpdates();
+        stateWatchersMap.put(collection, sw);
+        watchSet.set(sw);
       }
-      log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
       v.stateWatchers.add(docCollectionWatcher);
+      log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
       return v;
     });
 
-    DocCollection state = getCollectionOrNull(collection);
-    boolean remove;
-     try {
-       remove = docCollectionWatcher.onStateChanged(state);
-     } catch (Exception e) {
-       log.error("Exception running DocCollectionWatcher collection={}");
-       remove = true;
-     }
-     if (remove) {
-       removeDocCollectionWatcher(collection, docCollectionWatcher);
-     }
+    CollectionStateWatcher sw = watchSet.get();
+    if (sw != null) {
+      sw.refresh();
+      constructState(null, "registerDocCollectionWatcher");
+    }
 
   }
 
@@ -2225,23 +2210,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
 
-//    DocCollection coll = getCollectionOrNull(collection);
-//    if (predicate.matches(liveNodes, coll)) {
-//      return;
-//    }
+    DocCollection coll = getCollectionOrNull(collection);
+    if (predicate.matches(getLiveNodes(), coll)) {
+      return;
+    }
     final CountDownLatch latch = new CountDownLatch(1);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
     registerCollectionStateWatcher(collection, watcher);
     try {
-
       // wait for the watcher predicate to return true, or time out
       if (!latch.await(wait, unit)) {
-        DocCollection coll = getCollectionOrNull(collection);
-        if (predicate.matches(getLiveNodes(), coll)) {
-          return;
-        }
-
         throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + "live=" + liveNodes
                 + docCollection.get());
       }
@@ -2255,11 +2234,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
+    waitForActiveCollection(collection, wait, unit, false, shards, totalReplicas, true);
+  }
+
+  public void waitForActiveCollection(String collection, long wait, TimeUnit unit, boolean justLeaders,  int shards, int totalReplicas, boolean exact) {
     log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
         clusterState.size());
 
     assert collection != null;
-    CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
+    CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(justLeaders, shards, totalReplicas, exact);
 
     AtomicReference<DocCollection> state = new AtomicReference<>();
     AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
@@ -2314,9 +2297,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     try {
       // wait for the watcher predicate to return true, or time out
       if (!latch.await(wait, unit))
-        if (predicate.matches(liveNodes)) {
-          return;
-        }
         throw new TimeoutException("Timeout waiting for live nodes, currently they are: " + liveNodes);
 
     } finally {
@@ -2356,7 +2336,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @see #registerDocCollectionWatcher
    */
   public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
-    log.debug("remove watcher for collection {}", collection);
 
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
@@ -2367,8 +2346,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) return null;
       v.stateWatchers.remove(watcher);
+      log.debug("remove watcher for collection {} currentCount={}", collection, v.stateWatchers.size());
       if (v.canBeRemoved()) {
-        log.trace("no longer watch collection {}", collection);
+        log.debug("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
@@ -2384,7 +2364,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       return v;
     });
     if (reconstructState.get()) {
-      constructState(null);
+   //   constructState(null, "removeDocCollectionWatcher");
     }
   }
 
@@ -2406,57 +2386,138 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   // returns true if the state has changed
-  private boolean updateWatchedCollection(String coll, DocCollection newState, boolean live) {
+  private boolean updateWatchedCollection(String coll, ClusterState.CollectionRef newState) {
+    log.trace("updateWatchedCollection for [{}] [{}]", coll, newState);
     try {
       if (newState == null) {
         if (log.isDebugEnabled()) log.debug("Removing cached collection state for [{}]", coll);
-        watchedCollectionStates.remove(coll);
-        return true;
+        DocCollection prev = watchedCollectionStates.remove(coll);
+        if (prev != null) {
+          CollectionStateWatcher sw = stateWatchersMap.remove(coll);
+          if (sw != null) {
+            IOUtils.closeQuietly(sw);
+            sw.removeWatch();
+          }
+          clusterState.remove(coll);
+          collectionStateLocks.remove(coll);
+//          LazyCollectionRef lazyRef = new LazyCollectionRef(coll);
+//          lazyCollectionStates.put(coll, lazyRef);
+//          clusterState.put(coll, lazyRef);
+
+          if (collectionRemoved != null) {
+            collectionRemoved.removed(coll);
+          }
+        }
+        constructState(null, "updateWatchedCollection");
+        return false;
       }
 
-//      if (live) {
-//        return true;
-//      }
+      DocCollection newDocState = newState.get();
+      AtomicBoolean update = new AtomicBoolean();
+      clusterState.compute(coll, (k, v) -> {
 
-      boolean updated = false;
-      // CAS update loop
-      while (true) {
-        if (!collectionWatches.containsKey(coll)) {
-          break;
+        if (v == null) {
+          log.debug("new state, update");
+          update.set(true);
+          LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+          makeStateWatcher(coll, newDocState, prev);
+          return newState;
         }
-        DocCollection oldState = watchedCollectionStates.get(coll);
-        if (oldState == null) {
-          if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
-            if (log.isDebugEnabled()) {
-              log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
-            }
-            updated = true;
-            break;
-          }
-        } else {
-          if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
-            // no change to state, but we might have been triggered by the addition of a
-            // state watcher, so run notifications
-            updated = true;
-            break;
-          }
-          if (watchedCollectionStates.replace(coll, oldState, newState)) {
-            if (log.isDebugEnabled()) {
-              log.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
-            }
-            updated = true;
-            break;
+
+        ClusterState.CollectionRef docCollRef = clusterState.get(coll);
+        if (docCollRef == null) {
+          log.debug("no previous state found, update and notify");
+          update.set(true);
+          makeStateWatcher(coll, newDocState, null);
+          return newState;
+        }
+
+        if (docCollRef.isLazilyLoaded()) {
+          if (watchedCollectionStates.containsKey(coll)) {
+            update.set(true);
+            LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+            makeStateWatcher(coll, newDocState, prev);
+            return newState;
           }
         }
-      }
 
-      return true;
+        if (docCollRef.isLazilyLoaded()) {
+          log.debug("state was lazy loaded");
+          update.set(false);
+          return newState;
+        }
+
+        DocCollection docColl = docCollRef.get();
+        if (docColl == null) {
+          log.debug("null lazy state found, update and notify");
+          update.set(true);
+          LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+          makeStateWatcher(coll, newDocState, prev);
+          return newState;
+        }
+
+        if (newDocState.hasStateUpdates() && !docColl.hasStateUpdates() && newDocState.getZNodeVersion() >= docColl.getZNodeVersion()) {
+          log.debug("new state does have updates, replace >=");
+          update.set(true);
+          LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+          makeStateWatcher(coll, newDocState, prev);
+          return newState;
+        }
+
+        if (newDocState.hasStateUpdates() && docColl.hasStateUpdates() && newDocState.getZNodeVersion() >= docColl.getZNodeVersion() && !newDocState.getStateUpdates().equals(docColl.getStateUpdates())) {
+          log.debug("new state has same version but different updates");
+          update.set(true);
+          LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+          makeStateWatcher(coll, newDocState, prev);
+          return newState;
+        }
+
+        if (newDocState.getZNodeVersion() > docColl.getZNodeVersion()) {
+          log.debug("new state > old state, replace");
+          update.set(true);
+          LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+          makeStateWatcher(coll, newDocState, prev);
+          return newState;
+        }
+
+
+        log.debug("replace state {} {}", docColl.getZNodeVersion(), newDocState.getZNodeVersion());
+        update.set(true);
+        LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+        makeStateWatcher(coll, newDocState, prev);
+
+        return newState;
+      });
+
+      return update.get();
     } catch (Exception e) {
       log.error("Failing updating clusterstate", e);
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
   }
 
+  private void makeStateWatcher(String coll, DocCollection newDocState, LazyCollectionRef prev) {
+    AtomicReference<CollectionStateWatcher> createSw = new AtomicReference();
+    if (prev != null) {
+      watchedCollectionStates.put(coll, newDocState);
+      collectionWatches.compute(newDocState.getName(), (k, v) -> {
+        if (v == null) {
+          v = new CollectionWatch<>(newDocState.getName());
+          CollectionStateWatcher sw = new CollectionStateWatcher(newDocState.getName());
+          sw.createWatch();
+          stateWatchersMap.put(newDocState.getName(), sw);
+          createSw.set(sw);
+        }
+        v.coreRefCount.incrementAndGet();
+        return v;
+      });
+      CollectionStateWatcher sw = createSw.get();
+      if (sw != null) {
+        sw.refresh();
+      }
+    }
+  }
+
   public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
     collectionPropsObservers.compute(collection, (k, v) -> {
@@ -2507,7 +2568,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   private void notifyStateWatchers(String collection, DocCollection collectionState) {
-    log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
+    if (this.closed) {
+      log.warn("Already closed, won't notify");
+      return;
+    }
+    log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState.getName());
 
     try {
       notifications.submit(new Notification(collection, collectionState, collectionWatches));
@@ -2538,7 +2603,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         MDCLoggingContext.setNode(node);
       }
 
-      log.debug("notify on state change {}", collectionWatches.keySet());
+      log.trace("notify on state change {}", collectionWatches.keySet());
 
       List<DocCollectionWatcher> watchers = new ArrayList<>();
       collectionWatches.compute(collection, (k, v) -> {
@@ -2588,7 +2653,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   // called by createClusterStateWatchersAndUpdate()
   private void refreshAliases(AliasesManager watcher) throws KeeperException, InterruptedException {
-    constructState(null);
+    constructState(null, "refreshAliases");
     zkClient.exists(ALIASES, watcher);
     aliasesManager.update();
   }
@@ -2848,6 +2913,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas, boolean exact) {
+    return expectedShardsAndActiveReplicas(false, expectedShards, expectedReplicas, exact);
+  }
+
+  public static CollectionStatePredicate expectedShardsAndActiveReplicas(boolean justLeaders, int expectedShards, int expectedReplicas, boolean exact) {
     return (liveNodes, collectionState) -> {
       if (collectionState == null)
         return false;
@@ -2877,14 +2946,46 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         if (leader == null) {
           log.debug("slice={}", slice);
           return false;
+        } else {
+          if (leader.getState() != Replica.State.ACTIVE) {
+            return false;
+          }
+//          CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+//          prepCmd.setCoreName(leader.getName());
+//          prepCmd.setLeaderName(leader.getName());
+//          prepCmd.setCollection(collectionState.getName());
+//          prepCmd.setShardId(slice.getName());
+//
+//          int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+//
+//          try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).markInternalRequest().build()) {
+//
+//            prepCmd.setBasePath(leader.getBaseUrl());
+//
+//            try {
+//              NamedList<Object> result = client.request(prepCmd);
+//            } catch (SolrServerException | BaseHttpSolrClient.RemoteSolrException e) {
+//              log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+//              return false;
+//            } catch (IOException e) {
+//              log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+//              return false;
+//            }
+//          }
         }
-        for (Replica replica : slice) {
-          if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
-            activeReplicas++;
+        if (!justLeaders) {
+          for (Replica replica : slice) {
+            if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+              activeReplicas++;
+            }
           }
         }
         log.trace("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
       }
+
+      if (justLeaders) {
+        return true;
+      }
       if (!exact) {
         if (activeReplicas >= expectedReplicas) {
           return true;
@@ -3083,7 +3184,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     public void close() throws IOException {
       try {
         zkClient.removeWatches(path, this, WatcherType.Any, true);
-      } catch (KeeperException.NoWatcherException e) {
+      } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
 
       } catch (Exception e) {
         log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
index d5c2f90..19ce16d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class ObjectReleaseTracker {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static Map<Object,ObjectTrackerException> OBJECTS = new ConcurrentHashMap<>(64, 0.75f, 1);
+  public static Map<Object,ObjectTrackerException> OBJECTS = new ConcurrentHashMap<>(128, 0.75f, 5);
 
 
   protected final static ThreadLocal<StringBuilder> THREAD_LOCAL_SB = new ThreadLocal<>();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 95fd03e..7be6ad6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -64,7 +64,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
    */
   private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
   private final AtomicLong _lastShrink = new AtomicLong();
-  private final Map<Runnable,Future> _threads = new ConcurrentHashMap<>(256);
+  private final Map<Runnable,Future> _threads = new ConcurrentHashMap<>(256, 0.75f, SysStats.PROC_COUNT);
 
   private final Set<Future> _threadFutures = ConcurrentHashMap.newKeySet();
 
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index b66a644..eb66667 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -328,7 +328,7 @@ public class SolrTestCase extends Assert {
     System.setProperty("useCompoundFile", "true");
     System.setProperty("solr.tests.maxBufferedDocs", "1000");
 
-    System.setProperty("solr.getleader.looptimeout", "500");
+    System.setProperty("solr.getleader.looptimeout", "1500");
 
     System.setProperty("pkiHandlerPrivateKeyPath", SolrTestCaseJ4.class.getClassLoader().getResource("cryptokeys/priv_key512_pkcs8.pem").toExternalForm());
     System.setProperty("pkiHandlerPublicKeyPath", SolrTestCaseJ4.class.getClassLoader().getResource("cryptokeys/pub_key512.der").toExternalForm());
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index d47ffc6..53b09bb 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -858,12 +858,12 @@ public class MiniSolrCloudCluster {
 
   public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
     if (collection == null) throw new IllegalArgumentException("null collection");
-    waitForActiveCollection(collection,  120, TimeUnit.SECONDS, shards, totalReplicas);
+    waitForActiveCollection(collection,  60, TimeUnit.SECONDS, shards, totalReplicas);
   }
 
   public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
 
-    waitForActiveCollection(collection,  120, TimeUnit.SECONDS, shards, totalReplicas, exact);
+    waitForActiveCollection(collection,  60, TimeUnit.SECONDS, shards, totalReplicas, exact);
   }
 
   public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
@@ -881,7 +881,6 @@ public class MiniSolrCloudCluster {
       log.info("waitForNode: {}", runner.getNodeName());
     }
 
-
     ZkStateReader reader = getSolrClient().getZkStateReader();
 
     try {