You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/06/17 16:59:53 UTC

[lucene-solr] branch master updated: SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a97486  SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.
5a97486 is described below

commit 5a974860fa83408a86ca64b417f3111b037da7eb
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Jun 17 09:59:43 2019 -0700

    SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.
    
    Also add Predicate<DocCollection> equivilents for callers that don't care about liveNodes.
---
 solr/CHANGES.txt                                   |   4 +
 .../java/org/apache/solr/cloud/ZkController.java   |  14 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../cloud/api/collections/DeleteCollectionCmd.java |   2 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../OverseerCollectionMessageHandler.java          |   2 +-
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |   6 +-
 .../cloud/TestWaitForStateWithJettyShutdowns.java  | 155 +++++++++++
 .../solr/common/cloud/ZkStateReaderAccessor.java   |   2 +-
 .../client/solrj/impl/BaseCloudSolrClient.java     |  68 ++++-
 .../common/cloud/CollectionStatePredicate.java     |  10 +-
 .../solr/common/cloud/CollectionStateWatcher.java  |   9 +-
 ...StateWatcher.java => DocCollectionWatcher.java} |  17 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 202 ++++++++++++--
 .../common/cloud/TestCollectionStateWatchers.java  | 180 ++++++++-----
 .../common/cloud/TestDocCollectionWatcher.java     | 291 +++++++++++++++++++++
 .../solr/cloud/AbstractDistribZkTestBase.java      |   6 +-
 .../solr/cloud/AbstractFullDistribZkTestBase.java  |   4 +-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |   2 +-
 19 files changed, 856 insertions(+), 122 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2f7ab1e..061d2b3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -159,6 +159,10 @@ Bug Fixes
 
 * SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev)
 
+* SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and
+  CloudSolrClient to be triggered on liveNode changes.  Also add Predicate<DocCollection> equivilents
+  for callers that don't care about liveNodes. (hossman)
+
 Other Changes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8e90dd8..af298c5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -68,12 +68,12 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.BeforeReconnect;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.ConnectionManager;
 import org.apache.solr.common.cloud.DefaultConnectionStrategy;
 import org.apache.solr.common.cloud.DefaultZkACLProvider;
 import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.LiveNodesListener;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.Replica;
@@ -1052,7 +1052,7 @@ public class ZkController implements Closeable {
 
     CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
     for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
-      zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> {
+      zkStateReader.registerDocCollectionWatcher(collectionWithLocalReplica, (collectionState) -> {
         if (collectionState == null)  return false;
         boolean foundStates = true;
         for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
@@ -1194,7 +1194,7 @@ public class ZkController implements Closeable {
       // check replica's existence in clusterstate first
       try {
         zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
-            TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+            TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
       } catch (TimeoutException e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
       }
@@ -1301,7 +1301,7 @@ public class ZkController implements Closeable {
       // make sure we have an update cluster state right away
       zkStateReader.forceUpdateCollection(collection);
       // the watcher is added to a set so multiple calls of this method will left only one watcher
-      zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
+      zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
           new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
       return shardId;
     } finally {
@@ -1845,7 +1845,7 @@ public class ZkController implements Closeable {
       AtomicReference<String> errorMessage = new AtomicReference<>();
       AtomicReference<DocCollection> collectionState = new AtomicReference<>();
       try {
-        zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (n, c) -> {
+        zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
           collectionState.set(c);
           if (c == null)
             return false;
@@ -2545,7 +2545,7 @@ public class ZkController implements Closeable {
     };
   }
 
-  private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
+  private class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher {
     String coreNodeName;
     String shard;
     String coreName;
@@ -2558,7 +2558,7 @@ public class ZkController implements Closeable {
 
     @Override
     // synchronized due to SOLR-11535
-    public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+    public synchronized boolean onStateChanged(DocCollection collectionState) {
       if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
 
       boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index dfd5a21..372ae53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
             CollectionAdminParams.COLOCATED_WITH, collectionName);
         ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
         try {
-          zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
+          zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
         } catch (TimeoutException e) {
           log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
           // maybe the overseer queue is backed up, we don't want to fail the create request
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 2054258..6c0b147 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -134,7 +134,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
 
       // wait for a while until we don't see the collection
-      zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null);
+      zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
 
       // we can delete any remaining unique aliases
       if (!aliasReferences.isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index e38aa4a..c5c8e99 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -134,7 +134,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       ZkStateReader zkStateReader = ocmh.zkStateReader;
       ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
 
-      zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null);
+      zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
 
       log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
     } catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 81407c6..d8be1f2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -416,7 +416,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
   boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
     try {
-      zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> {
+      zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
           if (c == null)
             return true;
           Slice slice = c.getSlice(shard);
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 62a891a..1050b4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -35,8 +35,8 @@ import org.apache.solr.client.solrj.request.CoreStatus;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -115,7 +115,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
     JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
     ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
-    Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+    Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
     CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
         .process(cluster.getSolrClient());
     waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
@@ -221,7 +221,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     Replica replica = getRandomReplica(shard);
     JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
     ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
-    Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+    Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
 
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
new file mode 100644
index 0000000..4e21fb3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import java.util.Set;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+import static org.apache.solr.cloud.SolrCloudTestCase.clusterShape;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public void testWaitForStateAfterShutDown() throws Exception {
+    final String col_name = "test_col";
+    final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
+      (1, createTempDir(), JettyConfig.builder().build());
+    try {
+      log.info("Create our collection");
+      CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+      
+      log.info("Sanity check that our collection has come online");
+      cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, clusterShape(1, 1));
+                                           
+      log.info("Shutdown 1 node");
+      final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
+      nodeToStop.stop();
+      log.info("Wait to confirm our node is fully shutdown");
+      cluster.waitForJettyToStop(nodeToStop);
+
+      // now that we're confident that node has stoped, check if a waitForState
+      // call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
+      log.info("Now check if waitForState will recognize we already have the exepcted state");
+      cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
+                                           
+      
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  public void testWaitForStateBeforeShutDown() throws Exception {
+    final String col_name = "test_col";
+    final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool
+      (1, new DefaultSolrThreadFactory("background_executor"));
+    final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
+      (1, createTempDir(), JettyConfig.builder().build());
+    try {
+      log.info("Create our collection");
+      CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+      
+      log.info("Sanity check that our collection has come online");
+      cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS,
+                                           SolrCloudTestCase.clusterShape(1, 1));
+
+
+      // HACK implementation detail...
+      //
+      // we know that in the current implementation, waitForState invokes the predicate twice
+      // independently of the current state of the collection and/or wether the predicate succeeds.
+      // If this implementation detail changes, (ie: so that it's only invoked once)
+      // then this number needs to change -- but the test fundementally depends on the implementation
+      // calling the predicate at least once, which should also be neccessary for any future impl
+      // (to verify that it didn't "miss" the state change when creating the watcher)
+      final CountDownLatch latch = new CountDownLatch(2);
+      
+      final Future<?> backgroundWaitForState = executor.submit
+        (() -> {
+          try {
+            cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
+                                                 new LatchCountingPredicateWrapper(latch,
+                                                                                   clusterShape(1, 0)));
+          } catch (Exception e) {
+            log.error("background thread got exception", e);
+            throw new RuntimeException(e);
+          }
+          return;
+        }, null);
+      
+      log.info("Awaiting latch...");
+      if (! latch.await(120, TimeUnit.SECONDS)) {
+        fail("timed out Waiting a ridiculous amount of time for the waitForState latch -- did impl change?");
+      }
+
+      log.info("Shutdown 1 node");
+      final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
+      nodeToStop.stop();
+      log.info("Wait to confirm our node is fully shutdown");
+      cluster.waitForJettyToStop(nodeToStop);
+
+      // now that we're confident that node has stoped, check if a waitForState
+      // call will detect the missing replica -- shouldn't need long wait times...
+      log.info("Checking Future result to see if waitForState finished successfully");
+      try {
+        backgroundWaitForState.get();
+      } catch (ExecutionException e) {
+        log.error("background waitForState exception", e);
+        throw e;
+      }
+      
+    } finally {
+      ExecutorUtil.shutdownAndAwaitTermination(executor);
+      cluster.shutdown();
+    }
+  }
+    
+  public final class LatchCountingPredicateWrapper implements CollectionStatePredicate {
+    private final CountDownLatch latch;
+    private final CollectionStatePredicate inner;
+    public LatchCountingPredicateWrapper(final CountDownLatch latch, final CollectionStatePredicate inner) {
+      this.latch = latch;
+      this.inner = inner;
+    }
+    public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
+      final boolean result = inner.matches(liveNodes, collectionState);
+      log.info("Predicate called: result={}, (pre)latch={}, liveNodes={}, state={}",
+               result, latch.getCount(), liveNodes, collectionState);
+      latch.countDown();
+      return result;
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
index 0853ee0..b40a7a2 100644
--- a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
+++ b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
@@ -28,7 +28,7 @@ public class ZkStateReaderAccessor {
     this.zkStateReader = zkStateReader;
   }
 
-  public Set<CollectionStateWatcher> getStateWatchers(String collection) {
+  public Set<DocCollectionWatcher> getStateWatchers(String collection) {
     return zkStateReader.getStateWatchers(collection);
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 02fbb70..82645fd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import org.apache.solr.client.solrj.ResponseParser;
@@ -62,6 +63,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
@@ -362,11 +364,21 @@ public abstract class BaseCloudSolrClient extends SolrClient {
   }
 
   /**
-   * Block until a collection state matches a predicate, or a timeout
+   * Block until a CollectionStatePredicate returns true, or the wait times out
    *
+   * <p>
    * Note that the predicate may be called again even after it has returned true, so
    * implementors should avoid changing state within the predicate call itself.
+   * </p>
+   *
+   * <p>
+   * This implementation utilizes {@link CollectionStateWatcher} internally. 
+   * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate} 
+   * instead
+   * </p>
    *
+   * @see #waitForState(String, long, TimeUnit, Predicate)
+   * @see #registerCollectionStateWatcher
    * @param collection the collection to watch
    * @param wait       how long to wait
    * @param unit       the units of the wait parameter
@@ -379,14 +391,45 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     getClusterStateProvider().connect();
     assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
   }
+  /**
+   * Block until a Predicate returns true, or the wait times out
+   *
+   * <p>
+   * Note that the predicate may be called again even after it has returned true, so
+   * implementors should avoid changing state within the predicate call itself.
+   * </p>
+   *
+   * @see #registerDocCollectionWatcher
+   * @param collection the collection to watch
+   * @param wait       how long to wait
+   * @param unit       the units of the wait parameter
+   * @param predicate  a {@link Predicate} to test against the {@link DocCollection}
+   * @throws InterruptedException on interrupt
+   * @throws TimeoutException     on timeout
+   */
+  public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+      throws InterruptedException, TimeoutException {
+    getClusterStateProvider().connect();
+    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+  }
 
   /**
    * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
+   * <em>or</em> the set of live nodes changes.
+   *
+   * <p>
+   * The Watcher will automatically be removed when it's 
+   * <code>onStateChanged</code> returns <code>true</code>
+   * </p>
    *
-   * Note that the watcher is unregistered after it has been called once.  To make a watcher persistent,
-   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
-   * call
+   * <p>
+   * This implementation utilizes {@link ZkStateReader#registerCollectionStateWatcher} internally.
+   * Callers that don't care about liveNodes are encouraged to use a {@link DocCollectionWatcher} 
+   * instead
+   * </p>
    *
+   * @see #registerDocCollectionWatcher(String, DocCollectionWatcher)
+   * @see ZkStateReader#registerCollectionStateWatcher
    * @param collection the collection to watch
    * @param watcher    a watcher that will be called when the state changes
    */
@@ -394,6 +437,23 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     getClusterStateProvider().connect();
     assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
   }
+  
+  /**
+   * Register a DocCollectionWatcher to be called when the cluster state for a collection changes.
+   *
+   * <p>
+   * The Watcher will automatically be removed when it's 
+   * <code>onStateChanged</code> returns <code>true</code>
+   * </p>
+   *
+   * @see ZkStateReader#registerDocCollectionWatcher
+   * @param collection the collection to watch
+   * @param watcher    a watcher that will be called when the state changes
+   */
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
+    getClusterStateProvider().connect();
+    assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher);
+  }
 
   private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
     UpdateRequest updateRequest = (UpdateRequest) request;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
index 37b00d7..a91a499 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
@@ -19,23 +19,27 @@ package org.apache.solr.common.cloud;
 
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 /**
- * Interface to determine if a collection state matches a required state
+ * Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
  *
  * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
+ * @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
  */
 public interface CollectionStatePredicate {
 
   /**
-   * Check the collection state matches a required state
-   *
+   * Check if the set of liveNodes <em>and</em> the collection state matches a required state
+   * <p>
    * Note that both liveNodes and collectionState should be consulted to determine
    * the overall state.
+   * </p>
    *
    * @param liveNodes the current set of live nodes
    * @param collectionState the latest collection state, or null if the collection
    *                        does not exist
+   * @return true if the input matches the requirements of this predicate
    */
   boolean matches(Set<String> liveNodes, DocCollection collectionState);
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
index d75823c..63bfaf9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
@@ -21,21 +21,24 @@ import java.util.Set;
 
 /**
  * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
- * and called whenever the collection state changes.
+ * and called whenever there is a change in the collection state <em>or</em> in the list of liveNodes.
+ * 
+ * @see DocCollectionWatcher
  */
 public interface CollectionStateWatcher {
 
   /**
-   * Called when the collection we are registered against has a change of state.
+   * Called when either the collection we are registered against has a change of state <em>or</em> there is a change to the live nodes of our collection.
    *
+   * <p>
    * Note that, due to the way Zookeeper watchers are implemented, a single call may be
    * the result of several state changes. Also, multiple calls to this method can be made
    * with the same state, ie. without any new updates.
+   * </p>
    *
    * @param liveNodes       the set of live nodes
    * @param collectionState the new collection state (may be null if the collection has been
    *                        deleted)
-   *
    * @return true if the watcher should be removed
    */
   boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
similarity index 71%
copy from solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
copy to solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
index d75823c..0d65cfe 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
@@ -17,27 +17,24 @@
 
 package org.apache.solr.common.cloud;
 
-import java.util.Set;
-
 /**
- * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
- * and called whenever the collection state changes.
+ * Callback registered with {@link ZkStateReader#registerDocCollectionWatcher(String, DocCollectionWatcher)}
+ * and called whenever the DocCollection changes.
  */
-public interface CollectionStateWatcher {
+public interface DocCollectionWatcher {
 
   /**
    * Called when the collection we are registered against has a change of state.
    *
+   * <p>
    * Note that, due to the way Zookeeper watchers are implemented, a single call may be
    * the result of several state changes. Also, multiple calls to this method can be made
    * with the same state, ie. without any new updates.
+   * </p>
    *
-   * @param liveNodes       the set of live nodes
-   * @param collectionState the new collection state (may be null if the collection has been
-   *                        deleted)
-   *
+   * @param collection the new collection state (may be null if the collection has been deleted)
    * @return true if the watcher should be removed
    */
-  boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);
+  boolean onStateChanged(DocCollection collection);
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3af08d8..cd72203 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -185,7 +186,7 @@ public class ZkStateReader implements SolrCloseable {
 
   private final Runnable securityNodeListener;
 
-  private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
   private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
@@ -590,7 +591,7 @@ public class ZkStateReader implements SolrCloseable {
     notifyCloudCollectionsListeners();
 
     for (String collection : changedCollections) {
-      notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
+      notifyStateWatchers(collection, clusterState.getCollectionOrNull(collection));
     }
 
   }
@@ -1598,9 +1599,46 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /**
-   * Register a CollectionStateWatcher to be called when the state of a collection changes
+   * Register a CollectionStateWatcher to be called when the state of a collection changes 
+   * <em>or</em> the set of live nodes changes.
+   *
+   * <p>
+   * The Watcher will automatically be removed when it's 
+   * <code>onStateChanged</code> returns <code>true</code>
+   * </p>
+   *
+   * <p>
+   * This is method is just syntactic sugar for registering both a {@link DocCollectionWatcher} and 
+   * a {@link LiveNodesListener}.  Callers that only care about one or the other (but not both) are 
+   * encouraged to use the more specific methods register methods as it may reduce the number of 
+   * ZooKeeper watchers needed, and reduce the amount of network/cpu used.
+   * </p>
+   *
+   * @see #registerDocCollectionWatcher
+   * @see #registerLiveNodesListener
    */
   public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
+    final DocCollectionAndLiveNodesWatcherWrapper wrapper
+      = new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
+    
+    registerDocCollectionWatcher(collection, wrapper);
+    registerLiveNodesListener(wrapper);
+
+    DocCollection state = clusterState.getCollectionOrNull(collection);
+    if (stateWatcher.onStateChanged(liveNodes, state) == true) {
+      removeCollectionStateWatcher(collection, stateWatcher);
+    }
+  }
+
+  /**
+   * Register a DocCollectionWatcher to be called when the state of a collection changes
+   *
+   * <p>
+   * The Watcher will automatically be removed when it's 
+   * <code>onStateChanged</code> returns <code>true</code>
+   * </p>
+   */
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
@@ -1616,17 +1654,27 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
-    if (stateWatcher.onStateChanged(liveNodes, state) == true) {
-      removeCollectionStateWatcher(collection, stateWatcher);
+    if (stateWatcher.onStateChanged(state) == true) {
+      removeDocCollectionWatcher(collection, stateWatcher);
     }
   }
 
   /**
    * Block until a CollectionStatePredicate returns true, or the wait times out
    *
+   * <p>
    * Note that the predicate may be called again even after it has returned true, so
    * implementors should avoid changing state within the predicate call itself.
+   * </p>
    *
+   * <p>
+   * This implementation utilizes {@link CollectionStateWatcher} internally. 
+   * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate} 
+   * instead
+   * </p>
+   * 
+   * @see #waitForState(String, long, TimeUnit, Predicate)
+   * @see #registerCollectionStateWatcher
    * @param collection the collection to watch
    * @param wait       how long to wait
    * @param unit       the units of the wait parameter
@@ -1665,13 +1713,60 @@ public class ZkStateReader implements SolrCloseable {
       waitLatches.remove(latch);
     }
   }
-
+  
   /**
-   * Block until a LiveNodesStatePredicate returns true, or the wait times out
+   * Block until a Predicate returns true, or the wait times out
    *
+   * <p>
    * Note that the predicate may be called again even after it has returned true, so
    * implementors should avoid changing state within the predicate call itself.
+   * </p>
    *
+   * @param collection the collection to watch
+   * @param wait       how long to wait
+   * @param unit       the units of the wait parameter
+   * @param predicate  the predicate to call on state changes
+   * @throws InterruptedException on interrupt
+   * @throws TimeoutException on timeout
+   */
+  public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+      throws InterruptedException, TimeoutException {
+
+    if (closed) {
+      throw new AlreadyClosedException();
+    }
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    waitLatches.add(latch);
+    AtomicReference<DocCollection> docCollection = new AtomicReference<>();
+    DocCollectionWatcher watcher = (c) -> {
+      docCollection.set(c);
+      boolean matches = predicate.test(c);
+      if (matches)
+        latch.countDown();
+
+      return matches;
+    };
+    registerDocCollectionWatcher(collection, watcher);
+
+    try {
+      // wait for the watcher predicate to return true, or time out
+      if (!latch.await(wait, unit))
+        throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
+
+    }
+    finally {
+      removeDocCollectionWatcher(collection, watcher);
+      waitLatches.remove(latch);
+    }
+  }
+
+  /**
+   * Block until a LiveNodesStatePredicate returns true, or the wait times out
+   * <p>
+   * Note that the predicate may be called again even after it has returned true, so
+   * implementors should avoid changing state within the predicate call itself.
+   * </p>
    * @param wait       how long to wait
    * @param unit       the units of the wait parameter
    * @param predicate  the predicate to call on state changes
@@ -1713,14 +1808,35 @@ public class ZkStateReader implements SolrCloseable {
 
   /**
    * Remove a watcher from a collection's watch list.
-   *
+   * <p>
    * This allows Zookeeper watches to be removed if there is no interest in the
    * collection.
+   * </p>
    *
+   * @see #registerCollectionStateWatcher
    * @param collection the collection
    * @param watcher    the watcher
    */
   public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
+    final DocCollectionAndLiveNodesWatcherWrapper wrapper
+      = new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
+
+    removeDocCollectionWatcher(collection, wrapper);
+    removeLiveNodesListener(wrapper);
+  }
+  
+  /**
+   * Remove a watcher from a collection's watch list.
+   * <p>
+   * This allows Zookeeper watches to be removed if there is no interest in the
+   * collection.
+   * </p>
+   *
+   * @see #registerDocCollectionWatcher
+   * @param collection the collection
+   * @param watcher    the watcher
+   */
+  public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
     AtomicBoolean reconstructState = new AtomicBoolean(false);
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null)
@@ -1742,8 +1858,8 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /* package-private for testing */
-  Set<CollectionStateWatcher> getStateWatchers(String collection) {
-    final Set<CollectionStateWatcher> watchers = new HashSet<>();
+  Set<DocCollectionWatcher> getStateWatchers(String collection) {
+    final Set<DocCollectionWatcher> watchers = new HashSet<>();
     collectionWatches.compute(collection, (k, v) -> {
       if (v != null) {
         watchers.addAll(v.stateWatchers);
@@ -1845,12 +1961,12 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
+  private void notifyStateWatchers(String collection, DocCollection collectionState) {
     if (this.closed) {
       return;
     }
     try {
-      notifications.submit(new Notification(liveNodes, collection, collectionState));
+      notifications.submit(new Notification(collection, collectionState));
     }
     catch (RejectedExecutionException e) {
       if (closed == false) {
@@ -1861,29 +1977,27 @@ public class ZkStateReader implements SolrCloseable {
 
   private class Notification implements Runnable {
 
-    final Set<String> liveNodes;
     final String collection;
     final DocCollection collectionState;
 
-    private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) {
-      this.liveNodes = liveNodes;
+    private Notification(String collection, DocCollection collectionState) {
       this.collection = collection;
       this.collectionState = collectionState;
     }
 
     @Override
     public void run() {
-      List<CollectionStateWatcher> watchers = new ArrayList<>();
+      List<DocCollectionWatcher> watchers = new ArrayList<>();
       collectionWatches.compute(collection, (k, v) -> {
         if (v == null)
           return null;
         watchers.addAll(v.stateWatchers);
         return v;
       });
-      for (CollectionStateWatcher watcher : watchers) {
+      for (DocCollectionWatcher watcher : watchers) {
         try {
-          if (watcher.onStateChanged(liveNodes, collectionState)) {
-            removeCollectionStateWatcher(collection, watcher);
+          if (watcher.onStateChanged(collectionState)) {
+            removeDocCollectionWatcher(collection, watcher);
           }
         } catch (Exception exception) {
           log.warn("Error on calling watcher", exception);
@@ -2118,4 +2232,54 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
+  /** 
+   * Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener} 
+   * while wraping and delegating to a {@link CollectionStateWatcher}
+   */
+  private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener {
+    private final String collectionName;
+    private final CollectionStateWatcher delegate;
+
+    public int hashCode() {
+      return collectionName.hashCode() * delegate.hashCode();
+    }
+    
+    public boolean equals(Object other) {
+      if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) {
+        DocCollectionAndLiveNodesWatcherWrapper that
+          = (DocCollectionAndLiveNodesWatcherWrapper) other;
+        return this.collectionName.equals(that.collectionName)
+          && this.delegate.equals(that.delegate);
+      }
+      return false;
+    }
+    
+    public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName,
+                                                   final CollectionStateWatcher delegate) {
+      this.collectionName = collectionName;
+      this.delegate = delegate;
+    }
+    
+    @Override
+    public boolean onStateChanged(DocCollection collectionState) {
+      final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes,
+                                                     collectionState);
+      if (result) {
+        // it might be a while before live nodes changes, so proactively remove ourselves
+        removeLiveNodesListener(this);
+      }
+      return result;
+    }
+    
+    @Override
+    public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+      final DocCollection collection = ZkStateReader.this.clusterState.getCollectionOrNull(collectionName);
+      final boolean result = delegate.onStateChanged(newLiveNodes, collection);
+      if (result) {
+        // it might be a while before collection changes, so proactively remove ourselves
+        removeDocCollectionWatcher(collectionName, this);
+      }
+      return result;
+    }
+  }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 89cba06..f97b537 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -37,20 +37,22 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** @see TestDocCollectionWatcher */
 public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final int CLUSTER_SIZE = 4;
-  private static final int MAX_WAIT_TIMEOUT = 30;
+
+  private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
 
   private ExecutorService executor = null;
 
   @Before
   public void prepareCluster() throws Exception {
     configureCluster(CLUSTER_SIZE)
-    .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
-    .configure();
+      .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
+      .configure();
     executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
   }
   
@@ -75,6 +77,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
   private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
       throws InterruptedException, ExecutionException {
+    
     Future<Boolean> future = executor.submit(() -> {
       try {
         while (true) {
@@ -100,50 +103,71 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   }
 
   @Test
-  //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
-  public void testSimpleCollectionWatch() throws Exception {
-
+  public void testCollectionWatchWithShutdownOfActiveNode() throws Exception {
+    doTestCollectionWatchWithNodeShutdown(false);
+  }
+  
+  @Test
+  public void testCollectionWatchWithShutdownOfUnusedNode() throws Exception {
+    doTestCollectionWatchWithNodeShutdown(true);
+  }
+  
+  private void doTestCollectionWatchWithNodeShutdown(final boolean shutdownUnusedNode)
+    throws Exception {
+    
     CloudSolrClient client = cluster.getSolrClient();
-    CollectionAdminRequest.createCollection("testcollection", "config", 4, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+    // note: one node in our cluster is unsed by collection
+    CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-        (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
+                        (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
 
+    final JettySolrRunner extraJetty = cluster.startJettySolrRunner();
+    final JettySolrRunner jettyToShutdown
+      = shutdownUnusedNode ? extraJetty : cluster.getJettySolrRunners().get(0);
+    final int expectedNodesWithActiveReplicas = CLUSTER_SIZE - (shutdownUnusedNode ? 0 : 1);
+    
+    cluster.waitForAllNodes(MAX_WAIT_TIMEOUT);
+    
     // shutdown a node and check that we get notified about the change
     final CountDownLatch latch = new CountDownLatch(1);
     client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
-      int nodeCount = 0;
+      int nodesWithActiveReplicas = 0;
       log.info("State changed: {}", collectionState);
       for (Slice slice : collectionState) {
         for (Replica replica : slice) {
           if (replica.isActive(liveNodes))
-            nodeCount++;
+            nodesWithActiveReplicas++;
         }
       }
-      if (nodeCount == 3) {
+      if (liveNodes.size() == CLUSTER_SIZE
+          && expectedNodesWithActiveReplicas == nodesWithActiveReplicas) {
         latch.countDown();
         return true;
       }
       return false;
     });
 
-    JettySolrRunner j = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
-    cluster.waitForJettyToStop(j);
-    assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    cluster.stopJettySolrRunner(jettyToShutdown);
+    cluster.waitForJettyToStop(jettyToShutdown);
+    
+    assertTrue("CollectionStateWatcher was never notified of cluster change",
+               latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
 
-    waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS,
-        () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
+    waitFor("CollectionStateWatcher wasn't cleared after completion",
+            MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
 
   }
 
   @Test
-  // commented 20-July-2018  @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
   public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     final CountDownLatch latch = new CountDownLatch(1);
     client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -151,9 +175,10 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
       return false;
     });
 
-    assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    assertTrue("CollectionStateWatcher isn't called on new registration",
+               latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
     assertEquals("CollectionStateWatcher should be retained",
-        1, client.getZkStateReader().getStateWatchers("currentstate").size());
+                 1, client.getZkStateReader().getStateWatchers("currentstate").size());
 
     final CountDownLatch latch2 = new CountDownLatch(1);
     client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -162,9 +187,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
     });
 
     assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
-        latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
-    waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS,
-        () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
+               latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
   }
 
   @Test
@@ -172,17 +197,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+                        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
 
     // several goes, to check that we're not getting delayed state changes
     for (int i = 0; i < 10; i++) {
       try {
-        client.waitForState("waitforstate", 1, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
-      }
-      catch (TimeoutException e) {
+        client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
+                            (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+      } catch (TimeoutException e) {
         fail("waitForState should return immediately if the predicate is already satisfied");
       }
     }
@@ -190,114 +215,145 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   }
 
   @Test
-  // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
   public void testCanWaitForNonexistantCollection() throws Exception {
 
     Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+                                              (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
 
     CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
-        .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
+      .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
 
     assertTrue("waitForState was not triggered by collection creation", future.get());
 
   }
 
   @Test
-  // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
   public void testPredicateFailureTimesOut() throws Exception {
     CloudSolrClient client = cluster.getSolrClient();
     expectThrows(TimeoutException.class, () -> {
-      client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
+      client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
+                          ((liveNodes, collectionState) -> false));
     });
-    waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
-        () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
+    waitFor("Watchers for collection should be removed after timeout",
+            MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
 
   }
 
   @Test
-  //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
   public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
 
     CloudSolrClient client = cluster.getSolrClient();
     CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
 
     client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-        (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
+                        (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
 
     final CountDownLatch firstCall = new CountDownLatch(1);
 
     // stop a node, then add a watch waiting for all nodes to be back up
-    JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
+    JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt
+                                                        (cluster.getJettySolrRunners().size()));
     
     cluster.waitForJettyToStop(node1);
 
-    Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+    Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (liveNodes, collectionState) -> {
           firstCall.countDown();
           return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1);
         });
 
     // first, stop another node; the watch should not be fired after this!
-    JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
+    JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt
+                                                        (cluster.getJettySolrRunners().size()));
 
     // now start them both back up
     cluster.startJettySolrRunner(node1);
-    assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    assertTrue("CollectionStateWatcher not called",
+               firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
     cluster.startJettySolrRunner(node2);
 
     Boolean result = future.get();
-    assertTrue("Did not see a fully active cluster after 30 seconds", result);
+    assertTrue("Did not see a fully active cluster", result);
 
   }
 
   @Test
-  // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
   public void testWatcherIsRemovedAfterTimeout() throws Exception {
     CloudSolrClient client = cluster.getSolrClient();
     assertTrue("There should be no watchers for a non-existent collection!",
-        client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+               client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
 
     expectThrows(TimeoutException.class, () -> {
-      client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+      client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
+                          (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
     });
 
-    waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
-        () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+    waitFor("Watchers for collection should be removed after timeout",
+            MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
 
   }
 
   @Test
-  // commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
   public void testDeletionsTriggerWatches() throws Exception {
     CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
-        .process(cluster.getSolrClient());
-    Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null);
+      .process(cluster.getSolrClient());
+    
+    Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (l, c) -> c == null);
 
     CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient());
 
-    assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get());
+    assertTrue("CollectionStateWatcher not notified of delete call", future.get());
+  }
+  
+  @Test
+  public void testLiveNodeChangesTriggerWatches() throws Exception {
+    final CloudSolrClient client = cluster.getSolrClient();
+    
+    CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
+
+    Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (l, c) -> (l.size() == 1 + CLUSTER_SIZE));
+    
+    JettySolrRunner unusedJetty = cluster.startJettySolrRunner();
+    assertTrue("CollectionStateWatcher not notified of new node", future.get());
+    
+    waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
+
+    future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                              (l, c) -> (l.size() == CLUSTER_SIZE));
+
+    cluster.stopJettySolrRunner(unusedJetty);
+    
+    assertTrue("CollectionStateWatcher not notified of node lost", future.get());
+    
+    waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
+    
   }
 
   @Test
-  //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
   public void testWatchesWorkForStateFormat1() throws Exception {
 
     final CloudSolrClient client = cluster.getSolrClient();
 
     Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+                                              (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
 
     CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get());
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+    assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
+               future.get());
 
-    Future<Boolean> migrated
-        = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-              (n, c) -> c != null && c.getStateFormat() == 2);
+    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                                (n, c) -> c != null && c.getStateFormat() == 2);
 
-    CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
+    CollectionAdminRequest.migrateCollectionFormat("stateformat1")
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
     assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
 
   }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
new file mode 100644
index 0000000..f024a1c
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** @see TestCollectionStateWatchers */
+public class TestDocCollectionWatcher extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int CLUSTER_SIZE = 4;
+
+  private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
+
+  private ExecutorService executor = null;
+
+  @Before
+  public void prepareCluster() throws Exception {
+    configureCluster(CLUSTER_SIZE)
+      .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
+      .configure();
+    executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
+  }
+  
+  @After
+  public void tearDownCluster() throws Exception {
+    executor.shutdown();
+    shutdownCluster();
+    executor = null;
+  }
+
+  private Future<Boolean> waitInBackground(String collection, long timeout, TimeUnit unit,
+                                           Predicate<DocCollection> predicate) {
+    return executor.submit(() -> {
+      try {
+        cluster.getSolrClient().waitForState(collection, timeout, unit, predicate);
+      } catch (InterruptedException | TimeoutException e) {
+        return Boolean.FALSE;
+      }
+      return Boolean.TRUE;
+    });
+  }
+
+  private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
+      throws InterruptedException, ExecutionException {
+    
+    Future<Boolean> future = executor.submit(() -> {
+      try {
+        while (true) {
+          if (predicate.call())
+            return true;
+          TimeUnit.MILLISECONDS.sleep(10);
+        }
+      }
+      catch (InterruptedException e) {
+        return false;
+      }
+    });
+    try {
+      if (future.get(timeout, unit) == true) {
+        return;
+      }
+    }
+    catch (TimeoutException e) {
+      // pass failure message on
+    }
+    future.cancel(true);
+    fail(message);
+  }
+
+  @Test
+  public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
+
+    CloudSolrClient client = cluster.getSolrClient();
+    CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    client.registerDocCollectionWatcher("currentstate", (c) -> {
+      latch.countDown();
+      return false;
+    });
+
+    assertTrue("DocCollectionWatcher isn't called on new registration",
+               latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    assertEquals("DocCollectionWatcher should be retained",
+                 1, client.getZkStateReader().getStateWatchers("currentstate").size());
+
+    final CountDownLatch latch2 = new CountDownLatch(1);
+    client.registerDocCollectionWatcher("currentstate", (c) -> {
+      latch2.countDown();
+      return true;
+    });
+
+    assertTrue("DocCollectionWatcher isn't called when registering for already-watched collection",
+               latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+    waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
+  }
+
+  @Test
+  public void testWaitForStateChecksCurrentState() throws Exception {
+
+    CloudSolrClient client = cluster.getSolrClient();
+    CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+    client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+    // several goes, to check that we're not getting delayed state changes
+    for (int i = 0; i < 10; i++) {
+      try {
+        client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
+                            (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+      } catch (TimeoutException e) {
+        fail("waitForState should return immediately if the predicate is already satisfied");
+      }
+    }
+
+  }
+
+  @Test
+  public void testCanWaitForNonexistantCollection() throws Exception {
+
+    Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (c) -> (null != c));
+
+    CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
+      .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
+
+    assertTrue("waitForState was not triggered by collection creation", future.get());
+
+  }
+
+  @Test
+  public void testPredicateFailureTimesOut() throws Exception {
+    CloudSolrClient client = cluster.getSolrClient();
+    expectThrows(TimeoutException.class, () -> {
+      client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
+                          ((liveNodes, collectionState) -> false));
+    });
+    waitFor("Watchers for collection should be removed after timeout",
+            MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
+
+  }
+
+  @Test
+  public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
+
+    CloudSolrClient client = cluster.getSolrClient();
+    CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1)
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+    // create collection with 1 shard 1 replica...
+    client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+    // set watcher waiting for at least 3 replicas (will fail initially)
+    final AtomicInteger runCount = new AtomicInteger(0);
+    final Future<Boolean> future = waitInBackground
+      ("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+       (collectionState) -> {
+        runCount.incrementAndGet();
+        int replicas = 0;
+        for (Slice slice : collectionState) {
+          for (Replica replica : slice) {
+            replicas++;
+          }
+        }
+        return 3 <= replicas;
+      });
+
+    // add a 2nd replica...
+    CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+    client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                        (n, c) -> DocCollection.isFullyActive(n, c, 1, 2));
+
+    // confirm watcher has run at least once and has been retained...
+    final int runCountSnapshot = runCount.get();
+    assertTrue(0 < runCountSnapshot);
+    assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size());
+    
+    // now add a 3rd replica...
+    CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+    // now confirm watcher is invoked & removed
+    assertTrue("watcher never succeeded", future.get());
+    assertTrue(runCountSnapshot < runCount.get());
+    waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0);
+    
+  }
+
+  @Test
+  public void testWatcherIsRemovedAfterTimeout() throws Exception {
+    CloudSolrClient client = cluster.getSolrClient();
+    assertTrue("There should be no watchers for a non-existent collection!",
+               client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+
+    expectThrows(TimeoutException.class, () -> {
+      client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
+                          (c) -> (false));
+    });
+
+    waitFor("Watchers for collection should be removed after timeout",
+            MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+            () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+
+  }
+
+  @Test
+  public void testDeletionsTriggerWatches() throws Exception {
+    final CloudSolrClient client = cluster.getSolrClient();
+    CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1).process(client);
+      
+    client.waitForState("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                        (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+   
+    Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (c) -> c == null);
+
+    CollectionAdminRequest.deleteCollection("tobedeleted").process(client);
+
+    assertTrue("DocCollectionWatcher not notified of delete call", future.get());
+  }
+  
+  @Test
+  public void testWatchesWorkForStateFormat1() throws Exception {
+
+    final CloudSolrClient client = cluster.getSolrClient();
+
+    Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                              (c) -> (null != c) );
+
+    CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+    client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+    
+    assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
+               future.get());
+
+    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+                                                (c) -> c != null && c.getStateFormat() == 2);
+
+    CollectionAdminRequest.migrateCollectionFormat("stateformat1")
+      .processAndWait(client, MAX_WAIT_TIMEOUT);
+    assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
+
+  }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index eb2fd26..c9332bb 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -205,7 +205,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       throws Exception {
     log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
 
-    zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+    zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
       if (docCollection == null)
         return true;
       return false;
@@ -237,7 +237,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
       Thread.sleep(100);
     }
 
-    zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+    zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
       if (docCollection == null)
         return false;
 
@@ -253,7 +253,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
   public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName,
       Replica.State expectedState) throws InterruptedException, TimeoutException {
     reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
-        (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
+        (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
             && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
             && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
   }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index cdb6f68..ff87246 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -405,7 +405,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
         .setCreateNodeSet("")
         .process(cloudClient).getStatus());
     
-    cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount);
+    cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (c) -> c != null && c.getSlices().size() == sliceCount);
     
     ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
 
@@ -585,7 +585,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
   protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
     AtomicInteger nReplicas = new AtomicInteger();
     try {
-      client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> {
+      client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (c) -> {
         if (c == null)
           return false;
         int numReplicas = getTotalReplicas(c, c.getName());
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 5faa307..a97832b 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
       }
       
       for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
-        reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false);
+        reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
       }
      
     }