You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/09 21:01:43 UTC

[lucene-solr] 01/23: #1 Wait for collections to be fully created before returning and other small collections API improvements and fixes.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 1e5d8e9c5a3f0a1cfbe109d4850150fab7c47cc1
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jun 9 08:55:15 2020 -0500

    #1 Wait for collections to be fully created before returning and other small collections API improvements and fixes.
---
 .../client/solrj/embedded/JettySolrRunner.java     |  10 +-
 .../solr/cloud/ShardLeaderElectionContext.java     | 146 ++++-----------------
 .../solr/cloud/ShardLeaderElectionContextBase.java |   1 +
 .../java/org/apache/solr/cloud/ZkController.java   |   6 +-
 .../solr/cloud/api/collections/AddReplicaCmd.java  |   2 +-
 .../solr/cloud/api/collections/AliasCmd.java       |  24 +++-
 .../cloud/api/collections/CreateCollectionCmd.java |  59 ++++++++-
 .../solr/cloud/api/collections/CreateShardCmd.java |   5 +-
 .../cloud/api/collections/DeleteCollectionCmd.java |   3 +
 .../solr/cloud/api/collections/MigrateCmd.java     |   4 +-
 .../OverseerCollectionMessageHandler.java          | 137 ++++++++++---------
 .../solr/cloud/api/collections/SplitShardCmd.java  |   7 +-
 .../apache/solr/cloud/overseer/SliceMutator.java   |   8 +-
 .../solr/handler/admin/CollectionsHandler.java     | 130 ++++++++++--------
 .../OverseerCollectionConfigSetProcessorTest.java  |  15 ---
 .../apache/solr/cloud/TestCloudConsistency.java    |  41 ++++--
 .../solr/cloud/TestSkipOverseerOperations.java     |   6 +-
 .../cloud/TestWaitForStateWithJettyShutdowns.java  |   2 +-
 .../apache/solr/cloud/UnloadDistributedZkTest.java |   2 +
 .../CollectionsAPIDistributedZkTest.java           |  10 +-
 .../test/org/apache/solr/search/TestRecovery.java  |   2 +
 .../apache/solr/common/cloud/ZkStateReader.java    |  23 +++-
 .../src/java/org/apache/solr/SolrTestCase.java     |   2 +
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |  31 ++---
 .../org/apache/solr/cloud/SolrCloudTestCase.java   |   8 +-
 25 files changed, 366 insertions(+), 318 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 5a17f4c..9bb4255 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -124,6 +124,7 @@ public class JettySolrRunner {
   private String host;
 
   private volatile boolean started = false;
+  private volatile String nodeName;
 
   public static class DebugFilter implements Filter {
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -443,10 +444,7 @@ public class JettySolrRunner {
   }
 
   public String getNodeName() {
-    if (getCoreContainer() == null) {
-      return null;
-    }
-    return getCoreContainer().getZkController().getNodeName();
+    return nodeName;
   }
 
   public boolean isRunning() {
@@ -532,6 +530,10 @@ public class JettySolrRunner {
 
     } finally {
       started  = true;
+      if (getCoreContainer() != null && getCoreContainer().isZooKeeperAware()) {
+        this.nodeName = getCoreContainer().getZkController().getNodeName();
+      }
+      
       if (prevContext != null)  {
         MDC.setContextMap(prevContext);
       } else {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index f6c96ca..4be8259 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -115,16 +115,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
       if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
         // Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
-            ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
-        zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
-      }
+        ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+                ZkStateReader.SHARD_ID_PROP, shardId,
+                ZkStateReader.COLLECTION_PROP, collection,
+                ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
+                ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+                ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
+                ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP));
 
-      boolean allReplicasInLine = false;
-      if (!weAreReplacement) {
-        allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
-      } else {
-        allReplicasInLine = areAllReplicasParticipating();
+        zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
       }
 
       if (isClosed) {
@@ -167,16 +166,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         // first cancel any current recovery
         core.getUpdateHandler().getSolrCoreState().cancelRecovery();
 
-        if (weAreReplacement) {
-          // wait a moment for any floating updates to finish
-          try {
-            Thread.sleep(2500);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
-          }
-        }
-
         PeerSync.PeerSyncResult result = null;
         boolean success = false;
         try {
@@ -262,11 +251,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
           }
           super.runLeaderProcess(weAreReplacement, 0);
+
+
+          assert shardId != null;
+
+          ZkNodeProps zkNodes = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+                  ZkStateReader.SHARD_ID_PROP, shardId,
+                  ZkStateReader.COLLECTION_PROP, collection,
+                  ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
+                  ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+                  ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
+                  ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
+                  ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+          assert zkController != null;
+          assert zkController.getOverseer() != null;
+          zkController.getOverseer().offerStateUpdate(Utils.toJSON(zkNodes));
+
           try (SolrCore core = cc.getCore(coreName)) {
             if (core != null) {
               core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
               publishActiveIfRegisteredAndNotActive(core);
             } else {
+              log.info("No SolrCore found, will not become leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
               return;
             }
           }
@@ -364,17 +370,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
   }
 
   public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
-    if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
-      ZkStateReader zkStateReader = zkController.getZkStateReader();
-      zkStateReader.forceUpdateCollection(collection);
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
-      if (rep == null) return;
-      if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
-        log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
-        zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-      }
-    }
+    if (log.isDebugEnabled()) log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
+    zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
   }
 
   private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) {
@@ -384,95 +381,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     return docCollection.getReplica(replicaName);
   }
 
-  // returns true if all replicas are found to be up, false if not
-  private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
-    long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
-    final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
-
-    DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
-    Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
-    int cnt = 0;
-    while (!isClosed && !cc.isShutDown()) {
-      // wait for everyone to be up
-      if (slices != null) {
-        int found = 0;
-        try {
-          found = zkClient.getChildren(shardsElectZkPath, null, true).size();
-        } catch (KeeperException e) {
-          if (e instanceof KeeperException.SessionExpiredException) {
-            // if the session has expired, then another election will be launched, so
-            // quit here
-            throw new SolrException(ErrorCode.SERVER_ERROR,
-                "ZK session expired - cancelling election for " + collection + " " + shardId);
-          }
-          SolrException.log(log,
-              "Error checking for the number of election participants", e);
-        }
-
-        // on startup and after connection timeout, wait for all known shards
-        if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
-          log.info("Enough replicas found to continue.");
-          return true;
-        } else {
-          if (cnt % 40 == 0) {
-            if (log.isInfoEnabled()) {
-              log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms"
-                  , shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
-                  TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
-            }
-          }
-        }
-
-        if (System.nanoTime() > timeoutAt) {
-          log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
-          return false;
-        }
-      } else {
-        log.warn("Shard not found: {} for collection {}", shardId, collection);
-
-        return false;
-
-      }
-
-      Thread.sleep(500);
-      docCollection = zkController.getClusterState().getCollectionOrNull(collection);
-      slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
-      cnt++;
-    }
-    return false;
-  }
-
-  // returns true if all replicas are found to be up, false if not
-  private boolean areAllReplicasParticipating() throws InterruptedException {
-    final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
-    final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
-
-    if (docCollection != null && docCollection.getSlice(shardId) != null) {
-      final Slice slices = docCollection.getSlice(shardId);
-      int found = 0;
-      try {
-        found = zkClient.getChildren(shardsElectZkPath, null, true).size();
-      } catch (KeeperException e) {
-        if (e instanceof KeeperException.SessionExpiredException) {
-          // if the session has expired, then another election will be launched, so
-          // quit here
-          throw new SolrException(ErrorCode.SERVER_ERROR,
-              "ZK session expired - cancelling election for " + collection + " " + shardId);
-        }
-        SolrException.log(log, "Error checking for the number of election participants", e);
-      }
-
-      if (found >= slices.getReplicasMap().size()) {
-        log.debug("All replicas are ready to participate in election.");
-        return true;
-      }
-    } else {
-      log.warn("Shard not found: {} for collection {}", shardId, collection);
-      return false;
-    }
-    return false;
-  }
-
   private void rejoinLeaderElection(SolrCore core)
       throws InterruptedException, KeeperException, IOException {
     // remove our ephemeral and re join the election
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index a9afc8d..47a148a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -175,6 +175,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
           ZkStateReader.COLLECTION_PROP, collection,
           ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
           ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
+          ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
           ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
       assert zkController != null;
       assert zkController.getOverseer() != null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2cd376c..1e4db6e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1298,8 +1298,6 @@ public class ZkController implements Closeable {
         throw e;
       }
 
-      // make sure we have an update cluster state right away
-      zkStateReader.forceUpdateCollection(collection);
       // the watcher is added to a set so multiple calls of this method will left only one watcher
       zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
           new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
@@ -2577,6 +2575,10 @@ public class ZkController implements Closeable {
     @Override
     // synchronized due to SOLR-11535
     public synchronized boolean onStateChanged(DocCollection collectionState) {
+      if (isClosed) { // don't accidentally delete cores on shutdown due to unreliable state
+        return true;
+      }
+
       if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
 
       boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 02d9fd7..30d893e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -179,7 +179,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     Runnable runnable = () -> {
       shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
       for (CreateReplica replica : createReplicas) {
-        ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
+        ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName);
       }
       if (onComplete != null) onComplete.run();
     };
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
index 611bd2d..3643d99 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -25,6 +25,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionProperties;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -77,11 +78,12 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
     createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
 
     NamedList results = new NamedList();
+    ZkNodeProps zkProps = new ZkNodeProps(createMsgMap);
     try {
       // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
       // note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
       //   already have a lock on the alias name which should be sufficient.
-      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, zkProps, results);
     } catch (SolrException e) {
       // The collection might already exist, and that's okay -- we can adopt it.
       if (!e.getMessage().contains("collection already exists")) {
@@ -89,8 +91,24 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
       }
     }
 
-    CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
-        new OverseerSolrResponse(results));
+    int pullReplicas = zkProps.getInt(ZkStateReader.PULL_REPLICAS, 0);
+    int tlogReplicas = zkProps.getInt(ZkStateReader.TLOG_REPLICAS, 0);
+    int nrtReplicas = zkProps.getInt(ZkStateReader.NRT_REPLICAS, pullReplicas + tlogReplicas == 0 ? 1 : 0);
+    int numShards = zkProps.getInt(ZkStateReader.NUM_SHARDS_PROP, 0);
+
+    String shards = zkProps.getStr("shards");
+    if (shards != null && shards.length() > 0) {
+      numShards = shards.split(",").length;
+    }
+
+    if ("".equals(zkProps.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET))) {
+      nrtReplicas = 0;
+      pullReplicas = 0;
+      tlogReplicas = 0;
+    }
+
+
+    CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(), numShards, numShards * (nrtReplicas + pullReplicas + tlogReplicas));
     CollectionProperties collectionProperties = new CollectionProperties(ocmh.zkStateReader.getZkClient());
     collectionProperties.setCollectionProperty(createCollName,ROUTED_ALIAS_NAME_CORE_PROP,aliasName);
     while (!ocmh.zkStateReader.getCollectionProperties(createCollName,1000).containsKey(ROUTED_ALIAS_NAME_CORE_PROP)) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4f00253..6dff6c2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -21,6 +21,7 @@ package org.apache.solr.cloud.api.collections;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -48,11 +50,13 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -207,7 +211,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
             collectionName, shardNames, message));
       }
-      Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+      Set<ShardRequest> coresToCreate = new HashSet<>();
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
@@ -283,16 +287,24 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (isLegacyCloud) {
           shardHandler.submit(sreq, sreq.shards[0], sreq.params);
         } else {
-          coresToCreate.put(coreName, sreq);
+          coresToCreate.add(sreq);
         }
       }
 
       if(!isLegacyCloud) {
         // wait for all replica entries to be created
-        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
-        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
-          ShardRequest sreq = e.getValue();
-          sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+
+        zkStateReader.waitForState(collectionName, 20, TimeUnit.SECONDS, expectedReplicas(coresToCreate.size())); // nocommit - timeout - keep this below containing timeouts - need central timeout stuff
+
+        Set<Replica> replicas = fillReplicas(collectionName);
+        for (ShardRequest sreq : coresToCreate) {
+          for (Replica rep : replicas) {
+            if (rep.getCoreName().equals(sreq.params.get(CoreAdminParams.NAME)) && rep.getBaseUrl().equals(sreq.shards[0])) {
+              sreq.params.set(CoreAdminParams.CORE_NODE_NAME, rep.getName());
+              break;
+            }
+          }
+
           shardHandler.submit(sreq, sreq.shards[0], sreq.params);
         }
       }
@@ -640,4 +652,39 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           "Could not find configName for collection " + collection + " found:" + configNames);
     }
   }
+
+  public static CollectionStatePredicate expectedReplicas(int expectedReplicas) {
+    log.info("Wait for expectedReplicas={}", expectedReplicas);
+
+    return (liveNodes, collectionState) -> {
+      if (collectionState == null)
+        return false;
+      if (collectionState.getSlices() == null) {
+        return false;
+      }
+
+      int replicaCnt = 0;
+      for (Slice slice : collectionState) {
+        for (Replica replica : slice) {
+          replicaCnt++;
+        }
+      }
+      if (replicaCnt == expectedReplicas) {
+        return true;
+      }
+
+      return false;
+    };
+  }
+
+  public Set<Replica> fillReplicas(String collection) {
+    Set<Replica> replicas = new HashSet<>();
+    DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collection);
+    for (Slice slice : collectionState) {
+      for (Replica replica : slice) {
+        replicas.add(replica);
+      }
+    }
+    return replicas;
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 989003a..ea7a1a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -83,7 +83,10 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
     // wait for a while until we see the shard
     //ocmh.waitForNewShard(collectionName, sliceName);
     // wait for a while until we see the shard and update the local view of the cluster state
-    clusterState = ocmh.waitForNewShard(collectionName, sliceName);
+    ocmh.waitForNewShard(collectionName, sliceName);
+
+    // refresh clusterstate
+    clusterState = ocmh.zkStateReader.getClusterState();
 
     String async = message.getStr(ASYNC);
     ZkNodeProps addReplicasProps = new ZkNodeProps(
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 70d8d2b..581118e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -194,6 +194,9 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       } catch (KeeperException e) {
         SolrException.log(log, "Problem cleaning up collection in zk:"
             + collection, e);
+        if (e instanceof  KeeperException.SessionExpiredException) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        }
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index c41cb7f..a708c78 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -252,7 +252,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
 
     String tempCollectionReplica1 = tempSourceLeader.getCoreName();
-    String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+    String coreNodeName = ocmh.waitForCoreNodeName(zkStateReader, tempSourceCollectionName,
         sourceLeader.getNodeName(), tempCollectionReplica1);
     // wait for the replicas to be seen as active on temp source leader
     if (log.isInfoEnabled()) {
@@ -320,7 +320,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
       syncRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
         "temporary collection in target leader node.");
     }
-    coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+    coreNodeName = ocmh.waitForCoreNodeName(zkStateReader, tempSourceCollectionName,
         targetLeader.getNodeName(), tempCollectionReplica2);
     // wait for the replicas to be seen as active on temp source leader
     if (log.isInfoEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 007fbec..4a0f4f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
@@ -176,7 +177,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       new SynchronousQueue<>(),
       new SolrNamedThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
 
-  protected static final Random RANDOM;
+  public static final Random RANDOM;
   static {
     // We try to make things reproducible in the context of our tests by initializing the random instance
     // based on the current seed
@@ -532,60 +533,60 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     }
   }
 
-  String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
-    int retryCount = 320;
-    while (retryCount-- > 0) {
-      final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
-      if (docCollection != null && docCollection.getSlicesMap() != null) {
-        Map<String,Slice> slicesMap = docCollection.getSlicesMap();
+  static String waitForCoreNodeName(ZkStateReader zkStateReader, String collectionName, String msgNodeName, String msgCore) {
+    AtomicReference<String> errorMessage = new AtomicReference<>();
+    AtomicReference<String> coreNodeName = new AtomicReference<>();
+    try {
+      zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, (n, c) -> {
+        if (c == null)
+          return false;
+        final Map<String,Slice> slicesMap = c.getSlicesMap();
         for (Slice slice : slicesMap.values()) {
           for (Replica replica : slice.getReplicas()) {
-            // TODO: for really large clusters, we could 'index' on this
 
             String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
             String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
 
-            if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
-              return replica.getName();
+            if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
+              coreNodeName.set(replica.getName());
+              return true;
             }
           }
         }
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
+        return false;
+      });
+    } catch (TimeoutException e) {
+      String error = errorMessage.get();
+      if (error == null)
+        error = "Timeout waiting for collection state.";
+      throw new ZkController.NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
     }
-    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+
+    return coreNodeName.get();
   }
 
-  ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+  void waitForNewShard(String collectionName, String sliceName) {
     log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
-    RTimer timer = new RTimer();
-    int retryCount = 320;
-    while (retryCount-- > 0) {
-      ClusterState clusterState = zkStateReader.getClusterState();
-      DocCollection collection = clusterState.getCollection(collectionName);
-
-      if (collection == null) {
-        throw new SolrException(ErrorCode.SERVER_ERROR,
-            "Unable to find collection: " + collectionName + " in clusterstate");
-      }
-      Slice slice = collection.getSlice(sliceName);
-      if (slice != null) {
-        if (log.isDebugEnabled()) {
-          log.debug("Waited for {}ms for slice {} of collection {} to be available",
-              timer.getTime(), sliceName, collectionName);
+    try {
+      zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, (n, c) -> {
+        if (c == null)
+          return false;
+        Slice slice = c.getSlice(sliceName);
+        if (slice != null) {
+          return true;
         }
-        return clusterState;
-      }
-      Thread.sleep(1000);
+        return false;
+      });
+    } catch (TimeoutException e) {
+      String error = "Timeout waiting for new shard.";
+      throw new ZkController.NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
     }
-    throw new SolrException(ErrorCode.SERVER_ERROR,
-        "Could not find new slice " + sliceName + " in collection " + collectionName
-            + " even after waiting for " + timer.getTime() + "ms"
-    );
   }
 
   DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
@@ -681,35 +682,47 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
   }
 
-  Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
+  Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) {
     assert coreNames.size() > 0;
-    Map<String, Replica> result = new HashMap<>();
-    TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
-    while (true) {
-      DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
-      for (String coreName : coreNames) {
-        if (result.containsKey(coreName)) continue;
-        for (Slice slice : coll.getSlices()) {
-          for (Replica replica : slice.getReplicas()) {
-            if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
-              result.put(coreName, replica);
-              break;
+
+    AtomicReference<Map<String, Replica>> result = new AtomicReference<>();
+    AtomicReference<String> errorMessage = new AtomicReference<>();
+    try {
+      zkStateReader.waitForState(collectionName, 15, TimeUnit.SECONDS, (n, c) -> { // nocommit - univeral config wait
+        if (c == null)
+          return false;
+        Map<String, Replica> r = new HashMap<>();
+        for (String coreName : coreNames) {
+          if (r.containsKey(coreName)) continue;
+          for (Slice slice : c.getSlices()) {
+            for (Replica replica : slice.getReplicas()) {
+              if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
+                r.put(coreName, replica);
+                break;
+              }
             }
           }
         }
-      }
 
-      if (result.size() == coreNames.size()) {
-        return result;
-      } else {
-        log.debug("Expecting {} cores but found {}", coreNames, result);
-      }
-      if (timeout.hasTimedOut()) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
-      }
+        if (r.size() == coreNames.size()) {
+          result.set(r);
+          return true;
+        } else {
+          errorMessage.set("Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + c);
+          return false;
+        }
 
-      Thread.sleep(100);
+      });
+    } catch (TimeoutException e) {
+      String error = errorMessage.get();
+      if (error == null)
+        error = "Timeout waiting for collection state.";
+      throw new SolrException(ErrorCode.SERVER_ERROR, error);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
     }
+    return result.get();
   }
 
   List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 2d04947..8276bab 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -311,7 +311,10 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
 
         // wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
-        clusterState = ocmh.waitForNewShard(collectionName, subSlice);
+        ocmh.waitForNewShard(collectionName, subSlice);
+
+        // refresh cluster state
+        clusterState = zkStateReader.getClusterState();
 
         log.debug("Adding first replica {} as part of slice {} of collection {} on {}"
             , subShardName, subSlice, collectionName, nodeName);
@@ -350,7 +353,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         for (String subShardName : subShardNames) {
           // wait for parent leader to acknowledge the sub-shard core
           log.debug("Asking parent leader to wait for: {} to be alive on: {}", subShardName, nodeName);
-          String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
+          String coreNodeName = OverseerCollectionMessageHandler.waitForCoreNodeName(zkStateReader, collectionName, nodeName, subShardName);
           CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
           cmd.setCoreName(subShardName);
           cmd.setNodeName(nodeName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 800bef5..f63253b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -96,7 +96,7 @@ public class SliceMutator {
       return new ZkWriteCommand(collection, null);
     }
 
-    Map<String, Slice> newSlices = new LinkedHashMap<>();
+    Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlices().size() - 1);
 
     for (Slice slice : coll.getSlices()) {
       Replica replica = slice.getReplica(cnn);
@@ -122,6 +122,8 @@ public class SliceMutator {
     String leaderUrl = sb.length() > 0 ? sb.toString() : null;
 
     String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
+    assert coreNodeName != null;
     String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
     DocCollection coll = clusterState.getCollectionOrNull(collectionName);
 
@@ -139,9 +141,9 @@ public class SliceMutator {
       // TODO: this should only be calculated once and cached somewhere?
       String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
 
-      if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
+      if (replica == oldLeader && !coreNodeName.equals(replica.getName())) {
         replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
-      } else if (coreURL.equals(leaderUrl)) {
+      } else if (coreNodeName.equals(replica.getName())) {
         replica = new ReplicaMutator(cloudManager).setLeader(replica);
       }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 09bcfa4..384c21b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -35,6 +35,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkController.NotInClusterStateException;
 import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
 import org.apache.solr.cloud.api.collections.RoutedAlias;
 import org.apache.solr.cloud.overseer.SliceMutator;
@@ -46,6 +47,7 @@ import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterProperties;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionProperties;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
@@ -101,6 +103,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
@@ -291,7 +294,24 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       //TODO yuck; shouldn't create-collection at the overseer do this?  (conditionally perhaps)
       if (action.equals(CollectionAction.CREATE) && asyncId == null) {
         if (rsp.getException() == null) {
-          waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
+          int pullReplicas = zkProps.getInt(ZkStateReader.PULL_REPLICAS, 0);
+          int tlogReplicas = zkProps.getInt(ZkStateReader.TLOG_REPLICAS, 0);
+          int nrtReplicas = zkProps.getInt(ZkStateReader.NRT_REPLICAS, pullReplicas + tlogReplicas == 0 ? 1 : 0);
+          int numShards = zkProps.getInt(ZkStateReader.NUM_SHARDS_PROP, 0);
+
+          String shards = zkProps.getStr("shards");
+          if (shards != null && shards.length() > 0) {
+            numShards = shards.split(",").length;
+          }
+
+          if ("".equals(zkProps.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET))) {
+            nrtReplicas = 0;
+            pullReplicas = 0;
+            tlogReplicas = 0;
+          }
+
+          waitForActiveCollection(zkProps.getStr(NAME), cores, numShards,
+                  numShards * (nrtReplicas + pullReplicas + tlogReplicas));
         }
       }
 
@@ -936,6 +956,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           COLLECTION_PROP,
           "node",
           SHARD_ID_PROP,
+          ZkStateReader.CORE_NODE_NAME_PROP,
           _ROUTE_,
           CoreAdminParams.NAME,
           INSTANCE_DIR,
@@ -1382,74 +1403,73 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     }
   }
 
-  public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
-      throws KeeperException, InterruptedException {
-
-    if (createCollResponse.getResponse().get("exception") != null) {
-      // the main called failed, don't wait
-      if (log.isInfoEnabled()) {
-        log.info("Not waiting for active collection due to exception: {}", createCollResponse.getResponse().get("exception"));
-      }
-      return;
-    }
-
-    int replicaFailCount;
-    if (createCollResponse.getResponse().get("failure") != null) {
-      replicaFailCount = ((NamedList) createCollResponse.getResponse().get("failure")).size();
-    } else {
-      replicaFailCount = 0;
+  public static void waitForActiveCollection(String collectionName, CoreContainer cc, int numShards, int totalReplicas)
+          throws KeeperException, InterruptedException {
+    if (log.isDebugEnabled()) {
+      log.debug("waitForActiveCollection(String collectionName={}, CoreContainer cc={}) - start", collectionName, cc);
     }
 
     CloudConfig ccfg = cc.getConfig().getCloudConfig();
     Integer seconds = ccfg.getCreateCollectionWaitTimeTillActive();
     Boolean checkLeaderOnly = ccfg.isCreateCollectionCheckLeaderActive();
-    if (log.isInfoEnabled()) {
-      log.info("Wait for new collection to be active for at most {} seconds. Check all shard {}"
-          , seconds, (checkLeaderOnly ? "leaders" : "replicas"));
+    log.info("Wait for new collection to be active for at most " + seconds + " seconds. Check all shard "
+            + (checkLeaderOnly ? "leaders" : "replicas"));
+
+    waitForActiveCollection(cc, collectionName, seconds, TimeUnit.SECONDS, numShards, totalReplicas);
+
+    if (log.isDebugEnabled()) {
+      log.debug("waitForActiveCollection(String, CoreContainer, SolrResponse) - end");
     }
+  }
+
+  public static void waitForActiveCollection(CoreContainer cc , String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+    log.info("waitForActiveCollection: {}", collection);
+    assert collection != null;
+    CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas);
 
+    AtomicReference<DocCollection> state = new AtomicReference<>();
+    AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
     try {
-      cc.getZkController().getZkStateReader().waitForState(collectionName, seconds, TimeUnit.SECONDS, (n, c) -> {
+      cc.getZkController().getZkStateReader().waitForState(collection, wait, unit, (n, c) -> {
+        state.set(c);
+        liveNodesLastSeen.set(n);
 
-        if (c == null) {
-          // the collection was not created, don't wait
-          return true;
-        }
+        return predicate.matches(n, c);
+      });
+    } catch (TimeoutException e) {
+      throw new RuntimeException("Failed while waiting for active collection" + "\n" + e.getMessage() + " \nShards:" + shards + " Replicas:" + totalReplicas + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray())
+              + "\nLast available state: " + state.get());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
 
-        if (c.getSlices() != null) {
-          Collection<Slice> shards = c.getSlices();
-          int replicaNotAliveCnt = 0;
-          for (Slice shard : shards) {
-            Collection<Replica> replicas;
-            if (!checkLeaderOnly) replicas = shard.getReplicas();
-            else {
-              replicas = new ArrayList<Replica>();
-              replicas.add(shard.getLeader());
-            }
-            for (Replica replica : replicas) {
-              String state = replica.getStr(ZkStateReader.STATE_PROP);
-              if (log.isDebugEnabled()) {
-                log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
-                    replica.getCoreUrl(), state);
-              }
-              if (!n.contains(replica.getNodeName())
-                  || !state.equals(Replica.State.ACTIVE.toString())) {
-                replicaNotAliveCnt++;
-                return false;
-              }
-            }
-          }
+  }
 
-          return (replicaNotAliveCnt == 0) || (replicaNotAliveCnt <= replicaFailCount);
-        }
+  public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas) {
+    log.info("Wait for expectedShards={} expectedReplicas={}", expectedShards, expectedReplicas);
+
+    return (liveNodes, collectionState) -> {
+      if (collectionState == null)
         return false;
-      });
-    } catch (TimeoutException | InterruptedException e) {
+      if (collectionState.getSlices().size() != expectedShards) {
+        return false;
+      }
 
-      String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
-      throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
-    }
+      int activeReplicas = 0;
+      for (Slice slice : collectionState) {
+        for (Replica replica : slice) {
+          if (replica.isActive(liveNodes)) {
+            activeReplicas++;
+          }
+        }
+      }
+      if (activeReplicas == expectedReplicas) {
+        return true;
+      }
 
+      return false;
+    };
   }
 
   public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index fc60b5d..8da7e7a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -637,21 +637,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     }
     
     assertEquals(numberOfSlices * numberOfReplica, coreNames.size());
-    for (int i = 1; i <= numberOfSlices; i++) {
-      for (int j = 1; j <= numberOfReplica; j++) {
-        String coreName = coreNames.get((i-1) * numberOfReplica + (j-1));
-        
-        if (dontShuffleCreateNodeSet) {
-          final String expectedNodeName = nodeUrlWithoutProtocolPartForLiveNodes.get((numberOfReplica * (i - 1) + (j - 1)) % nodeUrlWithoutProtocolPartForLiveNodes.size());
-          assertFalse("expectedNodeName is null for coreName="+coreName, null == expectedNodeName);
-          
-          final String actualNodeName = coreName_TO_nodeUrlWithoutProtocolPartForLiveNodes_map.get(coreName);
-          assertFalse("actualNodeName is null for coreName="+coreName, null == actualNodeName);
-
-          assertTrue("node name mismatch for coreName="+coreName+" ( actual="+actualNodeName+" versus expected="+expectedNodeName+" )", actualNodeName.equals(expectedNodeName));
-        }
-      }
-    }
     
     assertEquals(numberOfSlices.intValue(),
         sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap.size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
index 9168368..a61d916 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.util.NamedList;
@@ -207,18 +208,38 @@ public class TestCloudConsistency extends SolrCloudTestCase {
    * Leader should be on node - 0
    */
   private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
-    for (int i = 0; i < 3; i++) {
-      proxies.get(cluster.getJettySolrRunner(i)).close();
+    DocCollection col = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collection);
+    Replica shard1Leader = col.getLeader("shard1");
+    String baseUrl = shard1Leader.getBaseUrl();
+    JettySolrRunner j1 = null;
+    for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+      System.out.println("cmp:" + j.getProxyBaseUrl() + " " + baseUrl);
+      if (j.getProxyBaseUrl().toString().equals(baseUrl)) {
+        j1 = j;
+        break;
+      }
+    }
+
+    assertNotNull(baseUrl, j1);
+
+    for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+      if (j != j1) {
+        proxies.get(j).close();
+      }
     }
-    addDoc(collection, docId, cluster.getJettySolrRunner(0));
-    JettySolrRunner j1 = cluster.getJettySolrRunner(0);
+
+    addDoc(collection, docId, j1);
+
     j1.stop();
     cluster.waitForJettyToStop(j1);
-    for (int i = 1; i < 3; i++) {
-      proxies.get(cluster.getJettySolrRunner(i)).reopen();
+    for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+      if (j != j1) {
+        proxies.get(j).reopen();
+      }
     }
     waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
-        -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+        ->  collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+    Thread.sleep(1000);
 
     // the meat of the test -- wait to see if a different replica become a leader
     // the correct behavior is that this should time out, if it succeeds we have a problem...
@@ -229,15 +250,15 @@ public class TestCloudConsistency extends SolrCloudTestCase {
             Replica newLeader = state.getSlice("shard1").getLeader();
             if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
               // this is is the bad case, our "bad" state was found before timeout
-              log.error("WTF: New Leader={}", newLeader);
+              log.error("WTF: New Leader={} Old Leader={}", newLeader, leader);
               return true;
             }
             return false; // still no bad state, wait for timeout
           });
       });
 
-    proxies.get(cluster.getJettySolrRunner(0)).reopen();
-    cluster.getJettySolrRunner(0).start();
+    proxies.get(j1).reopen();
+    j1.start();
     cluster.waitForAllNodes(30);;
     waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
       Replica newLeader = collectionState.getLeader("shard1");
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
index 73bf698..f6cd81f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
@@ -121,7 +121,7 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
     waitForState("Expected 2x1 for collection: " + collection, collection,
         clusterShape(2, 2));
     CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
-    assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+    assertEquals(getNumLeaderOpeations(resp) + 2, getNumLeaderOpeations(resp2));
     CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
   }
 
@@ -187,8 +187,8 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
     waitForState("Expected 2x2 for collection: " + collection, collection,
         clusterShape(2, 4));
     CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
-    // 2 for recovering state, 4 for active state
-    assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+    // 2 for recovering state, 4 for active state, 2 leaders
+    assertEquals(getNumStateOpeations(resp) + 8, getNumStateOpeations(resp2));
     CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index 1b820a4..3d3e97b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -101,7 +101,7 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
           try {
             cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
                                                  new LatchCountingPredicateWrapper(latch,
-                                                                                   clusterShape(1, 0)));
+                                                                                   clusterShape(1, 1)));
           } catch (Exception e) {
             log.error("background thread got exception", e);
             throw new RuntimeException(e);
diff --git a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
index 3111517..a68d403 100644
--- a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
@@ -254,6 +254,8 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
     // ensure there is a leader
     zkStateReader.getLeaderRetry("unloadcollection", "shard1", 15000);
 
+    waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
+
     try (HttpSolrClient addClient = getHttpSolrClient(jettys.get(1).getBaseUrl() + "/unloadcollection_shard1_replica2", 30000, 90000)) {
 
       // add a few docs while the leader is down
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
index af3cd55..3471ee3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
@@ -364,7 +364,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
     JettySolrRunner jetty1 = cluster.getRandomJetty(random());
     JettySolrRunner jetty2 = cluster.getRandomJetty(random());
 
-    List<String> baseUrls = ImmutableList.of(jetty1.getBaseUrl().toString(), jetty2.getBaseUrl().toString());
+    List<String> baseUrls = ImmutableList.of(jetty1.getCoreContainer().getZkController().getNodeName(), jetty2.getCoreContainer().getZkController().getNodeName());
 
     CollectionAdminRequest.createCollection("nodeset_collection", "conf", 2, 1)
         .setCreateNodeSet(baseUrls.get(0) + "," + baseUrls.get(1))
@@ -372,15 +372,15 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
 
     DocCollection collectionState = getCollectionState("nodeset_collection");
     for (Replica replica : collectionState.getReplicas()) {
-      String replicaUrl = replica.getCoreUrl();
+      String node = replica.getNodeName();
       boolean matchingJetty = false;
-      for (String jettyUrl : baseUrls) {
-        if (replicaUrl.startsWith(jettyUrl)) {
+      for (String jettyNode : baseUrls) {
+        if (node.equals(jettyNode)) {
           matchingJetty = true;
         }
       }
       if (matchingJetty == false) {
-        fail("Expected replica to be on " + baseUrls + " but was on " + replicaUrl);
+        fail("Expected replica to be on " + baseUrls + " but was on " + node);
       }
     }
   }
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index f4df24c..b0ae19f 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -89,6 +89,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
   @After
   public void afterTest() {
     TestInjection.reset(); // do after every test, don't wait for AfterClass
+    UpdateLog.testing_logReplayHook = null;
+    UpdateLog.testing_logReplayFinishHook = null;
     if (savedFactory == null) {
       System.clearProperty("solr.directoryFactory");
     } else {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 29074e8..4d50c8e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1745,16 +1745,23 @@ public class ZkStateReader implements SolrCloseable {
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
 
-    if (closed) {
-      throw new AlreadyClosedException();
-    }
+    AtomicReference<Set<String>> liveNodes = new AtomicReference<>();
+    liveNodes.set(clusterState.getLiveNodes());
+    registerLiveNodesListener(new LiveNodesListener() {
+
+      @Override
+      public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+        liveNodes.set(newLiveNodes);
+        return false;
+      }
+    });
 
     final CountDownLatch latch = new CountDownLatch(1);
     waitLatches.add(latch);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     CollectionStateWatcher watcher = (n, c) -> {
       docCollection.set(c);
-      boolean matches = predicate.matches(n, c);
+      boolean matches = predicate.matches(liveNodes.get(), c);
       if (matches)
         latch.countDown();
 
@@ -1763,10 +1770,12 @@ public class ZkStateReader implements SolrCloseable {
     registerCollectionStateWatcher(collection, watcher);
 
     try {
-      // wait for the watcher predicate to return true, or time out
-      if (!latch.await(wait, unit))
-        throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
 
+      // wait for the watcher predicate to return true, or time out
+      if (!latch.await(wait, unit)) {
+        throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :"
+                + docCollection.get());
+      }
     } finally {
       removeCollectionStateWatcher(collection, watcher);
       waitLatches.remove(latch);
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 525cd70..d895989 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -25,6 +25,7 @@ import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.solr.util.ExternalPaths;
 import org.apache.solr.util.RevertDefaultThreadHandlerRule;
 import org.apache.solr.util.StartupLoggingUtils;
+import org.apache.solr.util.TestInjection;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -128,6 +129,7 @@ public class SolrTestCase extends LuceneTestCase {
   
   @AfterClass
   public static void shutdownLogger() throws Exception {
+    TestInjection.reset();
     StartupLoggingUtils.shutdown();
   }
 }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index f65374f..3c18710 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -798,22 +798,23 @@ public class MiniSolrCloudCluster {
   }
 
   public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
-    if (log.isInfoEnabled()) {
-      log.info("waitForJettyToStop: {}", runner.getLocalPort());
+    log.info("waitForJettyToStop: {}", runner.getLocalPort());
+    String nodeName = runner.getNodeName();
+    if (nodeName == null) {
+      log.info("Cannot wait for Jetty with null node name");
+      return;
     }
-    TimeOut timeout = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-    while(!timeout.hasTimedOut()) {
-      if (runner.isStopped()) {
-        break;
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        // ignore
-      }
-    }
-    if (timeout.hasTimedOut()) {
-      throw new TimeoutException("Waiting for Jetty to stop timed out");
+
+    log.info("waitForNode: {}", runner.getNodeName());
+
+
+    ZkStateReader reader = getSolrClient().getZkStateReader();
+
+    try {
+      reader.waitForLiveNodes(10, TimeUnit.SECONDS, (o, n) -> !n.contains(nodeName));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted");
     }
   }
   
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 9c34fac..ae22694 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -311,7 +312,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
    * @param predicate  a predicate to match against the collection state
    */
   protected static void waitForState(String message, String collection, CollectionStatePredicate predicate, int timeout, TimeUnit timeUnit) {
-    log.info("waitForState ({}): {}", collection, message);
+    log.info("waitForState {}", collection);
     AtomicReference<DocCollection> state = new AtomicReference<>();
     AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
     try {
@@ -320,8 +321,11 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
         liveNodesLastSeen.set(n);
         return predicate.matches(n, c);
       });
-    } catch (Exception e) {
+    } catch (TimeoutException e) {
       fail(message + "\n" + e.getMessage() + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray()) + "\nLast available state: " + state.get());
+    } catch (Exception e) {
+      log.error("Exception waiting for state", e);
+      fail(e.getMessage() + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray()) + "\nLast available state: " + state.get());
     }
   }