You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 07:12:28 UTC

[lucene-solr] branch reference_impl_dev updated: @1097 And beyond. Go Overseer.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new bafc793  @1097 And beyond. Go Overseer.
bafc793 is described below

commit bafc7934c082ddb6d5e18afb3561f8a664c1a967
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Oct 31 02:11:56 2020 -0500

    @1097 And beyond. Go Overseer.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  37 +--
 .../java/org/apache/solr/cloud/ZkController.java   |   2 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 258 +++++++++++----------
 .../solr/cloud/api/collections/DeleteShardCmd.java |  27 ++-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |   7 -
 .../src/java/org/apache/solr/core/SolrCore.java    |   4 +-
 .../org/apache/solr/cloud/DeleteShardTest.java     |   5 +-
 .../apache/solr/cloud/DistributedQueueTest.java    |  66 +++---
 .../test/org/apache/solr/cloud/OverseerTest.java   |   8 +-
 .../solr/client/solrj/cloud/DistributedQueue.java  |  12 +-
 .../solrj/request/CollectionAdminRequest.java      |  10 +-
 .../java/org/apache/solr/common/util/TimeOut.java  |   8 +-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |   2 +-
 13 files changed, 245 insertions(+), 201 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 0fc4350..c5582cd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -253,7 +253,7 @@ public class Overseer implements SolrCloseable {
               // if there were any errors while processing
               // the state queue, items would have been left in the
               // work queue so let's process those first
-              byte[] data = fallbackQueue.peek();
+              byte[] data = fallbackQueue.peek(null);
               // TODO: can we do this with a builk call instead?
               while (fallbackQueueSize > 0 && data != null) {
                 final ZkNodeProps message = ZkNodeProps.load(data);
@@ -275,7 +275,7 @@ public class Overseer implements SolrCloseable {
                       log.warn(
                               "Exception when process message = {}, consider as bad message and poll out from the queue",
                               message);
-                      fallbackQueue.poll();
+                      fallbackQueue.poll(null);
                     }
                   } catch (InterruptedException e1) {
                     ParWork.propagateInterrupt(e);
@@ -285,8 +285,8 @@ public class Overseer implements SolrCloseable {
                   }
                   throw exp;
                 }
-                fallbackQueue.poll(); // poll-ing removes the element we got by peek-ing
-                data = fallbackQueue.peek();
+                fallbackQueue.poll(null); // poll-ing removes the element we got by peek-ing
+                data = fallbackQueue.peek(null);
                 fallbackQueueSize--;
               }
               // force flush at the end of the loop, if there are no pending updates, this is a no op call
@@ -337,28 +337,37 @@ public class Overseer implements SolrCloseable {
           }
           try {
             Set<String> processedNodes = new HashSet<>();
-            TimeOut timeout = new TimeOut(1, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+            TimeOut timeout = new TimeOut(3, TimeUnit.SECONDS, TimeSource.NANO_TIME);
             while (queue != null && !queue.isEmpty()) {
               if (isClosed()) {
                 log.info("Closing");
                 return;
               }
-              if (timeout.hasTimedOut()) {
-                break;
-              }
               for (Pair<String, byte[]> head : queue) {
                 byte[] data = head.second();
-                final ZkNodeProps message = ZkNodeProps.load(data);
-                if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
-                if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
-                processedNodes.add(new File(head.first()).getName());
-                // The callback always be called on this thread
-                processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
+
+                if (log.isDebugEnabled()) log.debug("look at node {} data={}", head.first(), head.second() == null ? null : head.second().length);
+                if (head.second() != null && head.second().length > 0) {
+                  final ZkNodeProps message = ZkNodeProps.load(data);
+                  if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
+                  if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
+                  processedNodes.add(new File(head.first()).getName());
+                  // The callback always be called on this thread
+                  processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
+                } else {
+                  log.warn("Found queue item with no data, removing it {} : {}", head.first(), new File(head.first()).getName());
+                  processedNodes.add(new File(head.first()).getName());
+                }
               }
               if (isClosed()) {
                 log.info("Overseer closed, exiting loop");
                 return;
               }
+
+              if (timeout.hasTimedOut()) {
+                if (log.isDebugEnabled()) log.debug("timeout, skipping out on tight loop {}", timeout.getInterval(TimeUnit.MILLISECONDS));
+                break;
+              }
               // if an event comes in the next *ms batch it together
               int wait = 10;
               if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index ab3339c..a52567b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -2728,7 +2728,7 @@ public class ZkController implements Closeable, Runnable {
       if (replicaRemoved) {
         try {
           log.info("Replica {} removed from clusterstate, remove it.", coreName);
-          // getCoreContainer().unload(coreName, true, true, true); // nocommit - this causes bad things in tests
+          getCoreContainer().unload(coreName, true, true, true); // nocommit - this causes bad things in tests
         } catch (SolrException e) {
           if (!e.getMessage().contains("Cannot unload non-existent core")) {
             // no need to log if the core was already unloaded
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 140443e..ed18f61 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -98,8 +98,8 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Contains the last set of children fetched from ZK. Elements are removed from the head of
    * this in-memory set as they are consumed from the queue.  Due to the distributed nature
    * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
-   * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
-   * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+   * Therefore, methods like {@link #peek(Predicate<String>)} have to double-check actual node existence, and methods
+   * like {@link #poll(Predicate<String>)} must resolve any races by attempting to delete the underlying node.
    */
   protected volatile TreeMap<String,byte[]> knownChildren;
 
@@ -158,10 +158,10 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return data at the first element of the queue, or null.
    */
   @Override
-  public byte[] peek() throws KeeperException, InterruptedException {
+  public byte[] peek(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     Timer.Context time = stats.time(dir + "_peek");
     try {
-      return firstElement();
+      return firstElement(acceptFilter);
     } finally {
       time.stop();
     }
@@ -175,8 +175,8 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return data at the first element of the queue, or null.
    */
   @Override
-  public byte[] peek(boolean block) throws KeeperException, InterruptedException {
-    return block ? peek(Long.MAX_VALUE) : peek();
+  public byte[] peek(Predicate<String> acceptFilter, boolean block) throws KeeperException, InterruptedException {
+    return block ? peek(acceptFilter, Long.MAX_VALUE) : peek(acceptFilter);
   }
 
   /**
@@ -187,7 +187,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return data at the first element of the queue, or null.
    */
   @Override
-  public byte[] peek(long wait) throws KeeperException, InterruptedException {
+  public byte[] peek(Predicate<String> acceptFilter, long wait) throws KeeperException, InterruptedException {
     byte[] result = null;
 
     Timer.Context time;
@@ -199,7 +199,7 @@ public class ZkDistributedQueue implements DistributedQueue {
 
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
 
-    result = firstElement();
+    result = firstElement(acceptFilter);
     if (result != null) {
       return result;
     }
@@ -208,17 +208,17 @@ public class ZkDistributedQueue implements DistributedQueue {
     TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
 
     if (foundChildren.size() > 0) {
-      result = firstElement();
+      result = firstElement(acceptFilter);
       return result;
     }
 
     TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
 
-    waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+    waitForChildren(null, foundChildren, timeout, watcher);
     if (foundChildren.size() == 0) {
       return null;
     }
-    result = firstElement();
+    result = firstElement(acceptFilter);
     return result;
 
   }
@@ -230,10 +230,10 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return Head of the queue or null.
    */
   @Override
-  public byte[] poll() throws KeeperException, InterruptedException {
+  public byte[] poll(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     Timer.Context time = stats.time(dir + "_poll");
     try {
-      return removeFirst();
+      return removeFirst(acceptFilter);
     } finally {
       time.stop();
     }
@@ -245,10 +245,10 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return The former head of the queue
    */
   @Override
-  public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+  public byte[] remove(Predicate<String> acceptFilter) throws NoSuchElementException, KeeperException{
     Timer.Context time = stats.time(dir + "_remove");
     try {
-      byte[] result = removeFirst();
+      byte[] result = removeFirst(acceptFilter);
       if (result == null) {
         throw new NoSuchElementException();
       }
@@ -306,29 +306,29 @@ public class ZkDistributedQueue implements DistributedQueue {
    * @return The former head of the queue
    */
   @Override
-  public byte[] take() throws KeeperException, InterruptedException {
+  public byte[] take(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     // Same as for element. Should refactor this.
     Timer.Context timer = stats.time(dir + "_take");
     updateLock.lockInterruptibly();
     try {
       long waitNanos = TimeUnit.MILLISECONDS.toNanos(60000);
 
-      byte[] result = removeFirst();
+      byte[] result = removeFirst(acceptFilter);
       if (result != null) {
         return result;
       }
 
       ChildWatcher watcher = new ChildWatcher();
-      TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
+      TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
 
       TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
 
-      waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+      waitForChildren( s -> s.startsWith(PREFIX) || acceptFilter.test(s), foundChildren, timeout, watcher);
       if (foundChildren.size() == 0) {
         return null;
       }
 
-      result = removeFirst();
+      result = removeFirst(acceptFilter);
       if (result != null) {
         return result;
       }
@@ -348,6 +348,7 @@ public class ZkDistributedQueue implements DistributedQueue {
   @Override
   public void offer(byte[] data) throws KeeperException {
     Timer.Context time = stats.time(dir + "_offer");
+    if (log.isDebugEnabled()) log.debug("Over item to queue {}", dir);
     try {
       try {
         if (maxQueueSize > 0) {
@@ -417,22 +418,38 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Returns the name if the first known child node, or {@code null} if the queue is empty.
    * @return
    */
-  private Map.Entry<String,byte[]> firstChild(boolean remove) {
+  private Map.Entry<String,byte[]> firstChild(boolean remove, Predicate<String> acceptFilter) {
     try {
       updateLock.lockInterruptibly();
       try {
         // We always return from cache first, the cache will be cleared if the node is not exist
         if (!knownChildren.isEmpty()) {
-          return remove ? knownChildren.pollFirstEntry() : knownChildren.firstEntry();
+          for (Map.Entry<String,byte[]> entry : knownChildren.entrySet()) {
+            if (acceptFilter != null && acceptFilter.test(entry.getKey())) {
+              continue;
+            }
+            if (remove) {
+              knownChildren.remove(entry.getKey());
+              try {
+                zookeeper.delete(dir + "/" + entry.getKey(), -1);
+              } catch (KeeperException.NoNodeException e) {
+                if (log.isDebugEnabled()) log.debug("No node found for {}", entry.getKey());
+              }
+              return entry;
+            }
+          }
+          return null;
         }
-
-        return null;
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
           updateLock.unlock();
         }
       }
-    } catch (InterruptedException e) {
+
+      fetchZkChildren(null, acceptFilter);
+
+      return null;
+    } catch (InterruptedException | KeeperException e) {
       ParWork.propagateInterrupt(e);
       throw new AlreadyClosedException(e);
     }
@@ -442,7 +459,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Return the current set of children from ZK; does not change internal state.
    */
   TreeMap<String,byte[]> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
-
+    if (log.isDebugEnabled()) log.debug("fetchZkChildren");
     TreeMap<String,byte[]> orderedChildren = new TreeMap<>();
     updateLock.lockInterruptibly();
     try {
@@ -454,14 +471,14 @@ public class ZkDistributedQueue implements DistributedQueue {
           if (!childName.startsWith(PREFIX)) {
 
             // responses can be written to same queue with different naming scheme
-            if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}, prefix={}", childName, PREFIX);
+            if (log.isDebugEnabled()) log.debug("Filtering child out by prefix name=: {}, prefix={}", childName, PREFIX);
             continue;
           }
           if (acceptFilter != null && acceptFilter.test(childName)) {
             if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
             continue;
           }
-
+          if (log.isDebugEnabled()) log.debug("found: {}", childName);
           orderedChildren.put(childName, entry.getValue());
         }
       }
@@ -472,31 +489,33 @@ public class ZkDistributedQueue implements DistributedQueue {
     }
 
     if (!orderedChildren.isEmpty()) {
+      if (log.isDebugEnabled()) log.debug("found children from fetch {}", orderedChildren.size());
       return orderedChildren;
     }
-
+    if (log.isDebugEnabled()) log.debug("found no children to fetch");
     TreeMap<String,byte[]> remoteKnownChildren = new TreeMap<>();
     try {
       List<String> childNames = zookeeper.getChildren(dir, watcher, true);
       stats.setQueueLength(childNames.size());
       for (String childName : childNames) {
-        if (log.isDebugEnabled()) log.debug("Examine child: {} out of {} {}", childName, childNames.size(), acceptFilter);
-        remoteKnownChildren.put(childName, null);
+        if (log.isDebugEnabled()) log.debug("Examine child: {} out of children={} acceptFilter={}", childName, childNames.size(), acceptFilter);
         // Check format
-        if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
-
+        if (!childName.startsWith(PREFIX)) {
           // responses can be written to same queue with different naming scheme
-          if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
+          if (log.isDebugEnabled()) log.debug("Excluding child by prefix: {}", childName);
           continue;
         }
+        remoteKnownChildren.put(childName, null);
         if (acceptFilter != null && acceptFilter.test(childName)) {
-          if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
+          if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", childName);
           continue;
         }
         if (log.isDebugEnabled()) log.debug("Add child to fetched children: {}", childName);
         orderedChildren.put(childName, null);
       }
+      if (log.isDebugEnabled()) log.debug("found {} remote children", remoteKnownChildren.size());
       updateKnownChildren(remoteKnownChildren);
+      if (log.isDebugEnabled()) log.debug("returning {} matched children", orderedChildren.size());
       return orderedChildren;
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
@@ -505,20 +524,20 @@ public class ZkDistributedQueue implements DistributedQueue {
   }
 
   private void updateKnownChildren(TreeMap<String,byte[]> children) {
+    if (log.isDebugEnabled()) log.debug("Update known children size={}", children.size());
+    TreeMap<String,byte[]> newKnownChildren = new TreeMap<>();
     updateLock.lock();
     try {
       Set<Map.Entry<String,byte[]>> entrySet = children.entrySet();
       for (Map.Entry<String,byte[]> entry : entrySet) {
         String childName = entry.getKey();
-        byte[] data = knownChildren == null ? null : knownChildren.get(childName);
-        if (data != null) {
-          if (childName.startsWith("/")) {
-            throw new IllegalArgumentException(childName);
-          }
-          children.put(childName, data);
+        byte[] data = entry.getValue();
+        if (data == null) {
+          data = knownChildren == null ? null : knownChildren.get(childName);
         }
+        newKnownChildren.put(childName, data);
       }
-      knownChildren = children;
+      knownChildren = newKnownChildren;
     } catch (Exception e) {
       log.error("", e);
     } finally {
@@ -542,12 +561,14 @@ public class ZkDistributedQueue implements DistributedQueue {
     TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
     try {
       if (foundChildren.size() == 0) {
-        waitForChildren(acceptFilter, foundChildren, waitNanos, timeout, watcher);
+        if (log.isDebugEnabled()) log.debug("found no children, watch for them  excludeFilter={}", acceptFilter);
+        waitForChildren(acceptFilter, foundChildren, timeout, watcher);
       }
 
       // Technically we could restart the method if we fasil to actually obtain any valid children
       // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
       // much more sophisticated waitNanos tracking.
+      if (log.isDebugEnabled()) log.debug("found children to process {}", foundChildren.size());
       result = Collections.synchronizedList(new ArrayList<>(foundChildren.size()));
       Set<String> dataPaths = new HashSet<>();
       for (Map.Entry<String,byte[]> child : foundChildren.entrySet()) {
@@ -555,13 +576,15 @@ public class ZkDistributedQueue implements DistributedQueue {
           break;
         }
 
-        byte[] data;
-        updateLock.lockInterruptibly();
-        try {
-          data = knownChildren.get(child.getKey());
-        } finally {
-          if (updateLock.isHeldByCurrentThread()) {
-            updateLock.unlock();
+        byte[] data = child.getValue();
+        if (data == null) {
+          updateLock.lockInterruptibly();
+          try {
+            data = knownChildren.get(child.getKey());
+          } finally {
+            if (updateLock.isHeldByCurrentThread()) {
+              updateLock.unlock();
+            }
           }
         }
 
@@ -570,20 +593,20 @@ public class ZkDistributedQueue implements DistributedQueue {
           dataPaths.add(dir + "/" + child.getKey());
           if (log.isDebugEnabled()) log.debug("get data for child={}", child.getKey());
         } else {
+          if (log.isDebugEnabled()) log.debug("found data locally already {}", child.getKey());
           result.add(new Pair<>(child.getKey(), data));
         }
       }
 
+      if (log.isDebugEnabled()) log.debug("fetch data for paths {}", dataPaths);
       Map<String,byte[]> dataMap = zookeeper.getData(dataPaths);
       updateLock.lockInterruptibly();
+      List<Pair<String,byte[]>> finalResult = result;
       try {
-        List<Pair<String,byte[]>> finalResult = result;
         dataMap.forEach((k, bytes) -> {
           finalResult.add(new Pair<>(k, bytes));
           if (bytes != null) {
             knownChildren.put(new File(k).getName(), bytes);
-          } else {
-            knownChildren.remove(new File(k).getName());
           }
         });
       } finally {
@@ -591,79 +614,77 @@ public class ZkDistributedQueue implements DistributedQueue {
           updateLock.unlock();
         }
       }
-
-      return new ArrayList<>(result);
+      if (log.isDebugEnabled()) log.debug("peek elements returning {} nodes", finalResult.size());
+      return new ArrayList<>(finalResult);
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
       throw new AlreadyClosedException(e);
     }
   }
 
-  private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
-    if (log.isDebugEnabled()) log.debug("wait for children ... {}", waitNanos);
+  private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
+    if (log.isDebugEnabled()) log.debug("wait for children ... {}ms", timeout.getInterval(TimeUnit.MILLISECONDS));
+
+    updateLock.lockInterruptibly();
     try {
+      for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+        if (!child.getKey().startsWith(PREFIX) && (acceptFilter == null || !acceptFilter.test(child.getKey()))) {
+          foundChildren.put(child.getKey(), child.getValue());
+        }
+      }
+    } finally {
+      if (updateLock.isHeldByCurrentThread()) {
+        updateLock.unlock();
+      }
+    }
+    if (!foundChildren.isEmpty()) {
+      if (log.isDebugEnabled()) log.debug("Found new children ... {}", foundChildren.size());
+      return;
+    }
+    if (timeout.hasTimedOut()) {
+      if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
+      return;
+    }
+    TreeMap<String,byte[]> fc = null;
+    while (fc == null || fc.isEmpty()) {
+      fc = fetchZkChildren(watcher, acceptFilter);
+      if (!fc.isEmpty()) {
+        foundChildren.putAll(fc);
+        return;
+      }
       updateLock.lockInterruptibly();
       try {
+        try {
+          changed.await(Math.min(timeout.getInterval(TimeUnit.MILLISECONDS), 5000), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          ParWork.propagateInterrupt(e);
+        }
+        if (zookeeper.isClosed() || !zookeeper.isConnected()) {
+          throw new AlreadyClosedException();
+        }
+        if (timeout.hasTimedOut()) {
+          return;
+        }
         for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
           if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
             foundChildren.put(child.getKey(), child.getValue());
           }
         }
+        if (!foundChildren.isEmpty()) {
+          try {
+            if (log.isDebugEnabled()) log.debug("Remove watches for {}");
+            zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+          } catch (Exception e) {
+            log.info(e.getMessage());
+          }
+          return;
+        }
+
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
           updateLock.unlock();
         }
       }
-      if (!foundChildren.isEmpty()) {
-        return;
-      }
-      if (waitNanos <= 0) {
-        if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
-        return;
-      }
-      TreeMap<String,byte[]> fc = null;
-      while (fc == null || fc.isEmpty()) {
-        if (watcher.fired) {
-          watcher.fired = false;
-          fc = fetchZkChildren(watcher, acceptFilter);
-          if (!fc.isEmpty()) {
-            foundChildren.putAll(fc);
-            return;
-          }
-        }
-        updateLock.lockInterruptibly();
-        try {
-          try {
-            changed.await(10, TimeUnit.MILLISECONDS);
-          } catch (InterruptedException e) {
-            ParWork.propagateInterrupt(e);
-          }
-          if (zookeeper.isClosed() || !zookeeper.isConnected()) {
-            throw new AlreadyClosedException();
-          }
-          if (timeout.hasTimedOut()) {
-            return;
-          }
-          for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
-            if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
-              foundChildren.put(child.getKey(), child.getValue());
-            }
-          }
-          if (!foundChildren.isEmpty()) {
-            return;
-          }
-        } finally {
-          if (updateLock.isHeldByCurrentThread()) {
-            updateLock.unlock();
-          }
-        }
-      }
-    } finally {
-      try {
-        zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
-      } catch (Exception e) {
-        log.info(e.getMessage());
-      }
     }
   }
 
@@ -672,10 +693,10 @@ public class ZkDistributedQueue implements DistributedQueue {
    *
    * @return the data at the head of the queue.
    */
-  private byte[] firstElement() throws KeeperException {
+  private byte[] firstElement(Predicate<String> acceptFilter) throws KeeperException {
     try {
 
-      Map.Entry<String,byte[]> firstChild = firstChild(false);
+      Map.Entry<String,byte[]> firstChild = firstChild(false, acceptFilter);
       if (firstChild == null) {
         return null;
       }
@@ -724,9 +745,9 @@ public class ZkDistributedQueue implements DistributedQueue {
     }
   }
 
-  private byte[] removeFirst() throws KeeperException {
+  private byte[] removeFirst(Predicate<String> acceptFilter) throws KeeperException {
     try {
-      Map.Entry<String,byte[]> firstChild = firstChild(true);
+      Map.Entry<String,byte[]> firstChild = firstChild(true, acceptFilter);
       if (firstChild == null) {
         return null;
       }
@@ -734,10 +755,7 @@ public class ZkDistributedQueue implements DistributedQueue {
       byte[] data;
       updateLock.lockInterruptibly();
       try {
-        data = knownChildren.get(firstChild.getKey());
-        if (data != null) {
-          return data;
-        }
+        data = knownChildren.remove(firstChild.getKey());
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
           updateLock.unlock();
@@ -746,7 +764,9 @@ public class ZkDistributedQueue implements DistributedQueue {
 
       try {
         String path = dir + "/" + firstChild.getKey();
-        byte[] result = zookeeper.getData(path, null, null, true);
+        if (data == null) {
+          data = zookeeper.getData(path, null, null, true);
+        }
         zookeeper.delete(path, -1, true);
         updateLock.lockInterruptibly();
         try {
@@ -757,7 +777,7 @@ public class ZkDistributedQueue implements DistributedQueue {
             updateLock.unlock();
           }
         }
-        return result;
+        return data;
       } catch (KeeperException.NoNodeException e) {
         return null;
       }
@@ -772,7 +792,6 @@ public class ZkDistributedQueue implements DistributedQueue {
   }
 
   @VisibleForTesting class ChildWatcher implements Watcher {
-    volatile boolean fired = false;
 
     @Override
     public void process(WatchedEvent event) {
@@ -787,13 +806,14 @@ public class ZkDistributedQueue implements DistributedQueue {
         updateLock.lock();
         try {
           fetchZkChildren(null, null);
-          fired = true;
           changed.signalAll();
         } catch (KeeperException | InterruptedException e) {
           log.error("", e);
         } finally {
           updateLock.unlock();
         }
+      } else {
+
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 77f05f8..aa9bc6c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -25,6 +25,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -109,7 +111,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
     try {
       List<ZkNodeProps> replicas = getReplicasForSlice(collectionName, slice);
-      CountDownLatch cleanupLatch = new CountDownLatch(replicas.size());
+
       for (ZkNodeProps r : replicas) {
         final ZkNodeProps replica = r.plus(message.getProperties()).plus("parallel", "true").plus(ASYNC, asyncId);
         if (log.isInfoEnabled()) {
@@ -119,7 +121,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
         NamedList deleteResult = new NamedList();
         try {
           ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, replica, deleteResult, () -> {
-            cleanupLatch.countDown();
+
             if (deleteResult.get("failure") != null) {
               synchronized (results) {
                 results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
@@ -136,24 +138,37 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
           });
         } catch (KeeperException e) {
           log.warn("Error deleting replica: {}", r, e);
-          cleanupLatch.countDown();
           throw e;
         } catch (Exception e) {
           ParWork.propagateInterrupt(e);
           log.warn("Error deleting replica: {}", r, e);
-          cleanupLatch.countDown();
           throw e;
         }
       }
       log.debug("Waiting for delete shard action to complete");
-      cleanupLatch.await(1, TimeUnit.MINUTES);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
           collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
       ZkStateReader zkStateReader = ocmh.zkStateReader;
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
 
-      zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
+
+      AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+
+      if (results.get("failure") == null && results.get("exception") == null) {
+        response.asyncFinalRunner = new Runnable() {
+          @Override
+          public void run() {
+            try {
+              zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, (c) ->  c  != null && c.getSlice(sliceId) == null);
+            } catch (InterruptedException e) {
+              log.warn("",  e);
+            } catch (TimeoutException e) {
+              log.warn("",  e);
+            }
+          }
+        };
+      }
 
       log.info("Successfully deleted collection: {} , shard: {}", collectionName, sliceId);
     } catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 988ccea..701ea53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -132,13 +132,6 @@ public class ZkStateWriter {
 
           reader.getZkClient().clean(path);
 
-          TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-          timeout.waitFor("", () -> {
-            DocCollection rc = reader.getClusterState().getCollectionOrNull(name);
-            if (rc == null) return true;
-            return false;
-          });
-
           LinkedHashMap<String,ClusterState.CollectionRef> collStates = new LinkedHashMap<>(prevState.getCollectionStates());
           collStates.remove(name);
           prevState = new ClusterState(prevState.getLiveNodes(), collStates, prevState.getZNodeVersion());
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index fcd5ab8..d694b2d 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -3110,14 +3110,14 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       File dataDir = cd.getInstanceDir().resolve(cd.getDataDir()).toFile();
       try {
         FileUtils.deleteDirectory(dataDir);
-      } catch (IOException e) {
+      } catch (Exception e) {
         log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir.getAbsolutePath(), e);
       }
     }
     if (deleteInstanceDir) {
       try {
         FileUtils.deleteDirectory(cd.getInstanceDir().toFile());
-      } catch (IOException e) {
+      } catch (Exception e) {
         log.error("Failed to delete instance dir for unloaded core: {} dir: {}", cd.getName(), cd.getInstanceDir(), e);
       }
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index e0a6fe2..f8dcac0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -77,14 +77,11 @@ public class DeleteShardTest extends SolrCloudTestCase {
 
     // Can delete an INATIVE shard
     CollectionAdminRequest.deleteShard(collection, "shard1").process(cluster.getSolrClient());
-    waitForState("Expected 'shard1' to be removed", collection, (n, c) -> {
-      return c.getSlice("shard1") == null;
-    });
 
     // Can delete a shard under construction
     setSliceState(collection, "shard2", Slice.State.CONSTRUCTION);
 
-    waitForState("Expected 'shard2' to be removed", collection, (n, c) -> {
+    waitForState("Expected 'shard2' to be under CONSTRUCTION", collection, (n, c) -> {
       return c != null && c.getSlice("shard2") != null && c.getSlice("shard2").getState() == State.CONSTRUCTION;
     });
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 71daf62..8d4b5d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -58,38 +58,38 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     DistributedQueue dq = makeDistributedQueue(dqZNode);
 
     // basic ops
-    assertNull(dq.poll());
+    assertNull(dq.poll(null));
     try {
-      dq.remove();
+      dq.remove(null);
       fail("NoSuchElementException expected");
     } catch (NoSuchElementException expected) {
       // expected
     }
 
     dq.offer(data);
-    assertArrayEquals(dq.peek(100), data);
-    assertArrayEquals(dq.remove(), data);
-    assertNull(dq.poll());
+    assertArrayEquals(dq.peek(null, 100), data);
+    assertArrayEquals(dq.remove(null), data);
+    assertNull(dq.poll(null));
 
     dq.offer(data);
-    assertArrayEquals(dq.take(), data); // waits for data
-    assertNull(dq.poll());
+    assertArrayEquals(dq.take(null), data); // waits for data
+    assertNull(dq.poll(null));
 
     dq.offer(data);
-    dq.peek(true); // wait until data is definitely there before calling remove
-    assertArrayEquals(dq.remove(), data);
-    assertNull(dq.poll());
+    dq.peek(null, true); // wait until data is definitely there before calling remove
+    assertArrayEquals(dq.remove(null), data);
+    assertNull(dq.poll(null));
 
     // should block until the background thread makes the offer
     (new QueueChangerThread(dq, 200)).start();
-    assertNotNull(dq.peek(true));
-    assertNotNull(dq.remove());
-    assertNull(dq.poll());
+    assertNotNull(dq.peek(null, true));
+    assertNotNull(dq.remove(null));
+    assertNull(dq.poll(null));
 
     // timeout scenario ... background thread won't offer until long after the peek times out
     QueueChangerThread qct = new QueueChangerThread(dq, 500);
     qct.start();
-    assertNull(dq.peek(100));
+    assertNull(dq.peek(null, 100));
     qct.join();
   }
 
@@ -105,17 +105,17 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     producer2.offer(data);
     producer.offer(data);
     producer.offer(data);
-    consumer.poll();
+    consumer.poll(null);
 
     assertEquals(2, consumer.getZkStats().getQueueLength());
     producer.offer(data);
     producer2.offer(data);
-    consumer.poll();
+    consumer.poll(null);
 
     // DQ still have elements in their queue, so we should not fetch elements path from Zk
     assertEquals(1, consumer.getZkStats().getQueueLength());
-    consumer.poll();
-    consumer.peek();
+    consumer.poll(null);
+    consumer.peek(null);
     assertEquals(2, consumer.getZkStats().getQueueLength());
   }
 
@@ -127,8 +127,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
 
     ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
 
-    assertNull(dq.peek());
-    Future<String> future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
+    assertNull(dq.peek(null));
+    Future<String> future = testExecutor.submit(() -> new String(dq.peek(null, true), UTF8));
     try {
       future.get(1000, TimeUnit.MILLISECONDS);
       fail("TimeoutException expected");
@@ -139,10 +139,10 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     // Ultimately trips the watcher, triggering child refresh
     dq.offer(testData.getBytes(UTF8));
     assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
-    assertNotNull(dq.poll());
+    assertNotNull(dq.poll(null));
 
     // After draining the queue, a watcher should be set.
-    assertNull(dq.peek(100));
+    assertNull(dq.peek(null, 100));
     
 
    // assertFalse(dq.isDirty());
@@ -153,7 +153,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     assertEquals(0, dq.watcherCount());
 
     // Rerun the earlier test make sure updates are still seen, post reconnection.
-    future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
+    future = testExecutor.submit(() -> new String(dq.peek(null, true), UTF8));
     try {
       future.get(1000, TimeUnit.MILLISECONDS);
       fail("TimeoutException expected");
@@ -164,8 +164,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     // Ultimately trips the watcher, triggering child refresh
     dq.offer(testData.getBytes(UTF8));
     assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
-    assertNotNull(dq.poll());
-    assertNull(dq.poll());
+    assertNotNull(dq.poll(null));
+    assertNull(dq.poll(null));
   }
 
   @Test
@@ -176,21 +176,21 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     assertEquals(1, dq.watcherCount());
     assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
     assertEquals(1, dq.watcherCount());
-    assertNull(dq.peek());
+    assertNull(dq.peek(null));
     assertEquals(1, dq.watcherCount());
-    assertNull(dq.peek(1));
+    assertNull(dq.peek(null, 1));
     assertEquals(1, dq.watcherCount());
 
     dq.offer("hello world".getBytes(UTF8));
-    assertNotNull(dq.peek()); // synchronously available
+    assertNotNull(dq.peek(null)); // synchronously available
     // dirty and watcher state indeterminate here, race with watcher
     Thread.sleep(100); // watcher should have fired now
-    assertNotNull(dq.peek());
+    assertNotNull(dq.peek(null));
     // in case of race condition, childWatcher is kicked off after peek()
     if (dq.watcherCount() == 0) {
-      dq.poll();
+      dq.poll(null);
       dq.offer("hello world".getBytes(UTF8));
-      dq.peek();
+      dq.peek(null);
     }
     assertEquals(1, dq.watcherCount());
     assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
@@ -205,8 +205,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     for (int i = 0; i < 100; i++) {
       byte[] data = String.valueOf(i).getBytes(UTF8);
       dq.offer(data);
-      assertNotNull(dq.peek());
-      dq.poll();
+      assertNotNull(dq.peek(null));
+      dq.poll(null);
       dq.peekElements(1, 1, s -> true);
     }
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index d52dc89..0f7e2e0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -842,14 +842,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
       while(!timeOut.hasTimedOut()) {
-        if (q.peek() == null) {
+        if (q.peek(null) == null) {
           break;
         }
         Thread.sleep(50);
       }
 
-      assertTrue(showQpeek(workQueue), workQueue.peek() == null);
-      assertTrue(showQpeek(q),  q.peek() == null);
+      assertTrue(showQpeek(workQueue), workQueue.peek(null) == null);
+      assertTrue(showQpeek(q),  q.peek(null) == null);
     } finally {
       close(overseerClient);
       close(reader);
@@ -860,7 +860,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
     if (q == null) {
       return "";
     }
-    byte[] bytes = q.peek();
+    byte[] bytes = q.peek(null);
     if (bytes == null) {
       return "";
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
index 9a31bff..1a6aed9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
@@ -26,17 +26,17 @@ import org.apache.solr.common.util.Pair;
  * Distributed queue component. Methods largely follow those in {@link java.util.Queue}.
  */
 public interface DistributedQueue {
-  byte[] peek() throws Exception;
+  byte[] peek(Predicate<String> acceptFilter) throws Exception;
 
-  byte[] peek(boolean block) throws Exception;
+  byte[] peek(Predicate<String> acceptFilter, boolean block) throws Exception;
 
-  byte[] peek(long wait) throws Exception;
+  byte[] peek(Predicate<String> acceptFilter, long wait) throws Exception;
 
-  byte[] poll() throws Exception;
+  byte[] poll(Predicate<String> acceptFilter) throws Exception;
 
-  byte[] remove() throws Exception;
+  byte[] remove(Predicate<String> acceptFilter) throws Exception;
 
-  byte[] take() throws Exception;
+  byte[] take(Predicate<String> acceptFilter) throws Exception;
 
   void offer(byte[] data) throws Exception;
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 5e59af3..6754d70 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -17,6 +17,7 @@
 package org.apache.solr.client.solrj.request;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -53,6 +54,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
@@ -75,7 +78,7 @@ import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTIO
  * @since solr 4.5
  */
 public abstract class CollectionAdminRequest<T extends CollectionAdminResponse> extends SolrRequest<T> implements V2RequestSupport, MapWriter {
-
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   /**
    * The set of modifiable collection properties
    */
@@ -452,12 +455,12 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     private Create(String collection, String config, String routerName, Integer numShards, String shards, Integer numNrtReplicas, Integer  numTlogReplicas, Integer numPullReplicas) {
       super(CollectionAction.CREATE, SolrIdentifierValidator.validateCollectionName(collection));
       // NOTE: there's very little we can assert about the args because nothing but "collection" is required by the server
-      if ((null != shards) && (null != numShards && numShards != 0)) {
+      if ((null != shards) && (null != numShards)) {
         throw new IllegalArgumentException("Can not specify both a numShards and a list of shards");
       }
       this.configName = config;
       this.routerName = routerName;
-      this.numShards = numShards;;
+      this.numShards = numShards;
       this.setShards(shards);
       this.nrtReplicas = numNrtReplicas;
       this.tlogReplicas = numTlogReplicas;
@@ -482,6 +485,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     }
 
     public int getTotaleReplicaCount() {
+      log.info("nrtReplicas={} tlogReplicas={} pullReplicas={}", nrtReplicas, tlogReplicas, pullReplicas);
       int cnt = (nrtReplicas == null ? 0 : nrtReplicas) + (tlogReplicas == null ? 0 : tlogReplicas) + (pullReplicas == null ? 0 : pullReplicas);
       return cnt;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java b/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
index 2256771..130e94d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
@@ -26,11 +26,13 @@ public class TimeOut {
 
   private final long timeoutAt, startTime;
   private final TimeSource timeSource;
+  private final long interval;
 
   public TimeOut(long interval, TimeUnit unit, TimeSource timeSource) {
     this.timeSource = timeSource;
     startTime = timeSource.getTimeNs();
-    this.timeoutAt = startTime + NANOSECONDS.convert(interval, unit);
+    this.interval = NANOSECONDS.convert(interval, unit);
+    this.timeoutAt = startTime + interval;
   }
 
   public boolean hasTimedOut() {
@@ -41,6 +43,10 @@ public class TimeOut {
     timeSource.sleep(ms);
   }
 
+  public long getInterval(TimeUnit unit) {
+    return unit.convert(interval, NANOSECONDS);
+  }
+
   public long timeLeft(TimeUnit unit) {
     return unit.convert(timeoutAt - timeSource.getTimeNs(), NANOSECONDS);
   }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index c4ea258..0b51b90 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -926,7 +926,7 @@ public class MiniSolrCloudCluster {
   }
 
   public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
-    log.info("waitForActiveCollection: {}", collection);
+    log.info("waitForActiveCollection: {} shards={} replicas={}, exact={}", collection, shards, totalReplicas, exact);
     CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
 
     AtomicReference<DocCollection> state = new AtomicReference<>();