You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2022/07/20 19:01:13 UTC

[solr] branch main updated: SOLR-16257: Improve ZkStateReader to avoid race condition (#909)

This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 97458a503bc SOLR-16257: Improve ZkStateReader to avoid race condition (#909)
97458a503bc is described below

commit 97458a503bc0e5a4f52c0dc4b211d4d79cd1bccd
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Wed Jul 20 12:01:08 2022 -0700

    SOLR-16257: Improve ZkStateReader to avoid race condition (#909)
    
    Race condition existed between collectionWatches and watchedCollectionStates.
---
 solr/CHANGES.txt                                   |   3 +
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 613 ++++++++++++++++-----
 .../apache/solr/common/cloud/ZkStateReader.java    | 272 +++++----
 .../solr/common/util/CommonTestInjection.java      |  34 ++
 4 files changed, 696 insertions(+), 226 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 777facf0372..e18d37d7492 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -59,6 +59,9 @@ Improvements
 
 * SOLR-16225: Upgrade dependencies (Carrot2, HPPC) (Dawid Weiss)
 
+* SOLR-16257: Improve ZkStateReader to avoid race condition between collectionWatches and watchedCollectionStates
+  (Patson Luk, Houston Putman, Mike Drob)
+
 Optimizations
 ---------------------
 * SOLR-16120: Optimise hl.fl expansion. (Christine Poerschke, David Smiley, Mike Drob)
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 639c7538a46..ac34bfaa1ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -16,11 +16,20 @@
  */
 package org.apache.solr.cloud.overseer;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.OverseerTest;
@@ -29,171 +38,511 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.CommonTestInjection;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkStateReaderTest extends SolrTestCaseJ4 {
-
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final long TIMEOUT = 30;
 
-  public void testExternalCollectionWatchedNotWatched() throws Exception {
-    Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
-    ZkTestServer server = new ZkTestServer(zkDir);
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-      // create new collection
-      ZkWriteCommand c1 =
-          new ZkWriteCommand(
-              "c1",
-              new DocCollection(
-                  "c1",
-                  new HashMap<>(),
-                  Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                  DocRouter.DEFAULT,
-                  0));
-      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-      writer.writePendingUpdates();
-      reader.forceUpdateCollection("c1");
-
-      assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
-      reader.registerCore("c1");
-      assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
-      reader.unregisterCore("c1");
-      assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+  private static class TestFixture implements Closeable {
+    private final ZkTestServer server;
+    private final SolrZkClient zkClient;
+    private final ZkStateReader reader;
+    private final ZkStateWriter writer;
+
+    private TestFixture(
+        ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) {
+      this.server = server;
+      this.zkClient = zkClient;
+      this.reader = reader;
+      this.writer = writer;
+    }
 
-    } finally {
+    @Override
+    public void close() throws IOException {
       IOUtils.close(reader, zkClient);
-      server.shutdown();
+      try {
+        server.shutdown();
+      } catch (InterruptedException e) {
+        // ok. Shutting down anyway
+      }
     }
   }
 
-  public void testCollectionStateWatcherCaching() throws Exception {
-    Path zkDir = createTempDir("testCollectionStateWatcherCaching");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
+  private TestFixture fixture = null;
 
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    fixture = setupTestFixture(getTestName());
+  }
 
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-      DocCollection state =
-          new DocCollection(
-              "c1",
-              new HashMap<>(),
-              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-              DocRouter.DEFAULT,
-              0);
-      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
-      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
-      reader.waitForState(
-          "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
-
-      Map<String, Object> props = new HashMap<>();
-      props.put("x", "y");
-      props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
-      state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
-      wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
-
-      boolean found = false;
-      TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-      while (!timeOut.hasTimedOut()) {
-        DocCollection c1 = reader.getClusterState().getCollection("c1");
-        if ("y".equals(c1.getStr("x"))) {
-          found = true;
-          break;
-        }
-      }
-      assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
-    } finally {
-      IOUtils.close(reader, zkClient);
-      server.shutdown();
+  @After
+  public void tearDown() throws Exception {
+    if (fixture != null) {
+      fixture.close();
     }
+    super.tearDown();
   }
 
-  public void testWatchedCollectionCreation() throws Exception {
-    Path zkDir = createTempDir("testWatchedCollectionCreation");
-
+  private static TestFixture setupTestFixture(String testPrefix) throws Exception {
+    Path zkDir = createTempDir(testPrefix);
     ZkTestServer server = new ZkTestServer(zkDir);
+    server.run();
+    SolrZkClient zkClient =
+        new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+    ZkController.createClusterZkNodes(zkClient);
 
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
+    ZkStateReader reader = new ZkStateReader(zkClient);
+    reader.createClusterStateWatchersAndUpdate();
 
-    try {
-      server.run();
+    ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
+    return new TestFixture(server, zkClient, reader, writer);
+  }
 
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-      reader.registerCore("c1");
+  public void testExternalCollectionWatchedNotWatched() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    // create new collection
+    ZkWriteCommand c1 =
+        new ZkWriteCommand(
+            "c1",
+            new DocCollection(
+                "c1",
+                new HashMap<>(),
+                Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                DocRouter.DEFAULT,
+                0));
+
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+    writer.writePendingUpdates();
+    reader.forceUpdateCollection("c1");
+
+    assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+    reader.registerCore("c1");
+    assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+    reader.unregisterCore("c1");
+    assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+  }
 
-      // Initially there should be no c1 collection.
-      assertNull(reader.getClusterState().getCollectionRef("c1"));
+  public void testCollectionStateWatcherCaching() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+    reader.waitForState(
+        "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
+
+    Map<String, Object> props = new HashMap<>();
+    props.put("x", "y");
+    props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
+    state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
+    wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    boolean found = false;
+    TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeOut.hasTimedOut()) {
+      DocCollection c1 = reader.getClusterState().getCollection("c1");
+      if ("y".equals(c1.getStr("x"))) {
+        found = true;
+        break;
+      }
+    }
+    assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
+  }
 
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-      reader.forceUpdateCollection("c1");
+  public void testWatchedCollectionCreation() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1");
+
+    // Initially there should be no c1 collection.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+    reader.forceUpdateCollection("c1");
+
+    // Still no c1 collection, despite a collection path.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    // create new collection
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+    // reader.forceUpdateCollection("c1");
+    reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
+    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+  }
 
-      // Still no c1 collection, despite a collection path.
-      assertNull(reader.getClusterState().getCollectionRef("c1"));
+  /**
+   * Verifies that znode and child versions are correct and version changes trigger cluster state
+   * updates
+   */
+  public void testNodeVersion() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    ClusterState clusterState = reader.getClusterState();
+    // create new collection
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+    clusterState = writer.writePendingUpdates();
+
+    // have to register it here after the updates, otherwise the child node watch will not be
+    // inserted
+    reader.registerCore("c1");
+
+    TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    timeOut.waitFor(
+        "Timeout on waiting for c1 to show up in cluster state",
+        () ->
+            reader.getClusterState().getCollectionRef("c1") != null
+                && reader.getClusterState().getCollectionRef("c1").get() != null);
+
+    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(0, ref.get().getZNodeVersion());
+    assertEquals(-1, ref.get().getChildNodesVersion());
+
+    DocCollection collection = ref.get();
+    PerReplicaStates prs =
+        PerReplicaStates.fetch(
+            collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
+    PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
+        .persist(collection.getZNode(), fixture.zkClient);
+    timeOut.waitFor(
+        "Timeout on waiting for c1 updated to have PRS state r1",
+        () -> {
+          DocCollection c = reader.getCollection("c1");
+          return c.getPerReplicaStates() != null
+              && c.getPerReplicaStates().get("r1") != null
+              && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
+        });
+
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
+    assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now
+
+    prs = ref.get().getPerReplicaStates();
+    PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
+        .persist(collection.getZNode(), fixture.zkClient);
+    timeOut.waitFor(
+        "Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN",
+        () ->
+            reader.getCollection("c1").getPerReplicaStates().get("r1").state
+                == Replica.State.ACTIVE);
+
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
+    // but child version should be 3 now (1 del + 1 add)
+    assertEquals(3, ref.get().getChildNodesVersion());
+
+    // now delete the collection
+    wc = new ZkWriteCommand("c1", null);
+    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+    clusterState = writer.writePendingUpdates();
+    timeOut.waitFor(
+        "Timeout on waiting for c1 to be removed from cluster state",
+        () -> reader.getClusterState().getCollectionRef("c1").get() == null);
+
+    reader.unregisterCore("c1");
+    // re-add the same collection
+    wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+    clusterState = writer.writePendingUpdates();
+    // re-register, otherwise the child watch would be missing from collection deletion
+    reader.registerCore("c1");
+
+    // reader.forceUpdateCollection("c1");
+    timeOut.waitFor(
+        "Timeout on waiting for c1 to show up in cluster state again",
+        () ->
+            reader.getClusterState().getCollectionRef("c1") != null
+                && reader.getClusterState().getCollectionRef("c1").get() != null);
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(0, ref.get().getZNodeVersion());
+    assertEquals(-1, ref.get().getChildNodesVersion()); // child node version is reset
+
+    // re-add PRS
+    collection = ref.get();
+    prs =
+        PerReplicaStates.fetch(
+            collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
+    PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
+        .persist(collection.getZNode(), fixture.zkClient);
+    timeOut.waitFor(
+        "Timeout on waiting for c1 updated to have PRS state r1",
+        () -> {
+          DocCollection c = reader.getCollection("c1");
+          return c.getPerReplicaStates() != null
+              && c.getPerReplicaStates().get("r1") != null
+              && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
+        });
+
+    ref = reader.getClusterState().getCollectionRef("c1");
+
+    // child version should be reset since the state.json node was deleted and re-created
+    assertEquals(1, ref.get().getChildNodesVersion());
+  }
 
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    // Initially there should be no c1 collection.
+    assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+    // create new collection
+    DocCollection state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(0, ref.get().getZNodeVersion());
+
+    // update the collection
+    state =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            ref.get().getZNodeVersion());
+    wc = new ZkWriteCommand("c1", state);
+    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+    writer.writePendingUpdates();
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertNotNull(ref);
+    assertFalse(ref.isLazilyLoaded());
+    assertEquals(1, ref.get().getZNodeVersion());
+
+    // delete the collection c1, add a collection c2 that is NOT watched
+    ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+    state =
+        new DocCollection(
+            "c2",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+    writer.writePendingUpdates();
+
+    reader.forciblyRefreshAllClusterStateSlow();
+    ref = reader.getClusterState().getCollectionRef("c1");
+    assertNull(ref);
+
+    ref = reader.getClusterState().getCollectionRef("c2");
+    assertNotNull(ref);
+    assertTrue(
+        "c2 should have been lazily loaded but is not!",
+        ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+    assertEquals(0, ref.get().getZNodeVersion());
+  }
 
-      // create new collection
-      DocCollection state =
-          new DocCollection(
-              "c1",
-              new HashMap<>(),
-              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-              DocRouter.DEFAULT,
-              0);
-      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
-      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
-      writer.writePendingUpdates();
+  public void testGetCurrentCollections() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    reader.registerCore("c1"); // listen to c1. not yet exist
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+    reader.forceUpdateCollection("c1");
+    Set<String> currentCollections = reader.getCurrentCollections();
+    assertEquals(0, currentCollections.size()); // no active collections yet
+
+    // now create both c1 (watched) and c2 (not watched)
+    DocCollection state1 =
+        new DocCollection(
+            "c1",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+    ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+    DocCollection state2 =
+        new DocCollection(
+            "c2",
+            new HashMap<>(),
+            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+            DocRouter.DEFAULT,
+            0);
+
+    // do not listen to c2
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+    writer.writePendingUpdates();
+
+    reader.forceUpdateCollection("c1");
+    reader.forceUpdateCollection("c2");
+
+    // should detect both collections (c1 watched, c2 lazy loaded)
+    currentCollections = reader.getCurrentCollections();
+    assertEquals(2, currentCollections.size());
+  }
 
-      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+  /**
+   * Simulates race condition that might arise when state updates triggered by watch notification
+   * contend with removal of collection watches.
+   *
+   * <p>Such race condition should no longer exist with the new code that uses a single map for both
+   * "collection watches" and "latest state of watched collection"
+   */
+  public void testWatchRaceCondition() throws Exception {
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+    CommonTestInjection.setDelay(1000);
+    final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+    try {
+      ZkStateWriter writer = fixture.writer;
+      final ZkStateReader reader = fixture.reader;
+      fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicReference<Exception> updateException = new AtomicReference<>();
+      executorService.submit(
+          () -> {
+            try {
+              ClusterState clusterState = reader.getClusterState();
+              while (!stopMutatingThread.get()) {
+                DocCollection collection = clusterState.getCollectionOrNull("c1");
+                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+                // create new collection
+                DocCollection state =
+                    new DocCollection(
+                        "c1",
+                        new HashMap<>(),
+                        Map.of(
+                            ZkStateReader.CONFIGNAME_PROP,
+                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                        DocRouter.DEFAULT,
+                        currentVersion);
+                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+                clusterState = writer.writePendingUpdates();
+                TimeUnit.MILLISECONDS.sleep(100);
+              }
+            } catch (Exception e) {
+              updateException.set(e);
+            }
+            return null;
+          });
+      executorService.shutdown();
 
-      // reader.forceUpdateCollection("c1");
-      reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
-      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
-      assertNotNull(ref);
-      assertFalse(ref.isLazilyLoaded());
+      reader.waitForState(
+          "c1",
+          10,
+          TimeUnit.SECONDS,
+          slices -> slices != null); // wait for the state to become available
+
+      final CountDownLatch latch = new CountDownLatch(2);
+
+      // remove itself on 2nd trigger
+      DocCollectionWatcher dummyWatcher =
+          collection -> {
+            latch.countDown();
+            return latch.getCount() == 0;
+          };
+      reader.registerDocCollectionWatcher("c1", dummyWatcher);
+      assertTrue(
+          "Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS));
+      reader.removeDocCollectionWatcher("c1", dummyWatcher);
+
+      // cluster state might not be updated right the way from the removeDocCollectionWatcher call
+      // above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher
+      // as well and might still be in the middle of updating the cluster state.
+      TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+      timeOut.waitFor(
+          "The ref is not lazily loaded after waiting",
+          () -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+
+      if (updateException.get() != null) {
+        throw (updateException.get());
+      }
     } finally {
-      IOUtils.close(reader, zkClient);
-      server.shutdown();
+      stopMutatingThread.set(true);
+      CommonTestInjection.reset();
+      ExecutorUtil.awaitTermination(executorService);
     }
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index a2f9864525b..353500581c2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -23,7 +23,6 @@ import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -55,6 +55,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.CommonTestInjection;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Pair;
@@ -171,10 +172,6 @@ public class ZkStateReader implements SolrCloseable {
   public static final String SHARD_LEADERS_ZKNODE = "leaders";
   public static final String ELECTION_NODE = "election";
 
-  /** "Interesting" and actively watched Collections. */
-  private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates =
-      new ConcurrentHashMap<>();
-
   /** "Interesting" but not actively watched Collections. */
   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates =
       new ConcurrentHashMap<>();
@@ -195,8 +192,11 @@ public class ZkStateReader implements SolrCloseable {
 
   private final Runnable securityNodeListener;
 
-  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches =
-      new ConcurrentHashMap<>();
+  /**
+   * Collections with active watches. The {@link StatefulCollectionWatch} inside for each collection
+   * might also contain the latest DocCollection (state) observed
+   */
+  private DocCollectionWatches collectionWatches = new DocCollectionWatches();
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the
   // PropsWatcher map.
@@ -246,6 +246,140 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
+  /**
+   * A ConcurrentHashMap of active watcher by collection name
+   *
+   * <p>Each watcher DocCollectionWatch also contains the latest DocCollection (state) observed
+   */
+  private static class DocCollectionWatches {
+    private final ConcurrentHashMap<String, StatefulCollectionWatch>
+        statefulWatchesByCollectionName = new ConcurrentHashMap<>();
+
+    /**
+     * Gets the DocCollection (state) of the collection which the corresponding watch last observed
+     *
+     * @param collection the collection name to get DocCollection on
+     * @return The last observed DocCollection(state). if null, that means there's no such
+     *     collection.
+     */
+    private DocCollection getDocCollection(String collection) {
+      StatefulCollectionWatch watch = statefulWatchesByCollectionName.get(collection);
+      return watch != null ? watch.currentState : null;
+    }
+
+    /**
+     * Gets the active collections (collections that exist) being watched
+     *
+     * @return an immutable set of active collection names
+     */
+    private Set<String> activeCollections() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .map(Entry::getKey)
+          .collect(Collectors.toUnmodifiableSet());
+    }
+
+    /**
+     * Gets the count of active collections (collections that exist) being watched
+     *
+     * @return the count of active collections
+     */
+    private long activeCollectionCount() {
+      return statefulWatchesByCollectionName.entrySet().stream()
+          .filter(
+              (Entry<String, StatefulCollectionWatch> entry) ->
+                  entry.getValue().currentState != null)
+          .count();
+    }
+
+    /**
+     * Gets a Set of watched collection names. The returned value is thread-safe and unmodifiable.
+     *
+     * @return Set of watched collection names
+     */
+    private Set<String> watchedCollections() {
+      return Collections.unmodifiableSet(statefulWatchesByCollectionName.keySet());
+    }
+
+    private Set<Entry<String, StatefulCollectionWatch>> watchedCollectionEntries() {
+      return Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet());
+    }
+
+    /**
+     * Updates the latest observed DocCollection (state) of the {@link StatefulCollectionWatch} if
+     * the collection is being watched
+     *
+     * @param collection the collection name
+     * @param newState the new DocCollection (state) observed
+     * @return whether the state has changed for the watched collection
+     */
+    private boolean updateDocCollection(String collection, DocCollection newState) {
+      AtomicBoolean stateHasChanged = new AtomicBoolean(false);
+      statefulWatchesByCollectionName.computeIfPresent(
+          collection,
+          (col, watch) -> {
+            DocCollection oldState = watch.currentState;
+            if (oldState == null && newState == null) {
+              // OK, the collection not yet exist in ZK or already deleted
+            } else if (oldState == null) {
+              if (log.isDebugEnabled()) {
+                log.debug("Add data for [{}] ver [{}]", collection, newState.getZNodeVersion());
+              }
+              watch.currentState = newState;
+            } else if (newState == null) {
+              log.debug("Removing cached collection state for [{}]", collection);
+              watch.currentState = null;
+            } else { // both new and old states are non-null
+              int oldCVersion =
+                  oldState.getPerReplicaStates() == null
+                      ? -1
+                      : oldState.getPerReplicaStates().cversion;
+              int newCVersion =
+                  newState.getPerReplicaStates() == null
+                      ? -1
+                      : newState.getPerReplicaStates().cversion;
+              if (oldState.getZNodeVersion() < newState.getZNodeVersion()
+                  || oldCVersion < newCVersion) {
+                watch.currentState = newState;
+                if (log.isDebugEnabled()) {
+                  log.debug(
+                      "Updating data for [{}] from [{}] to [{}]",
+                      collection,
+                      oldState.getZNodeVersion(),
+                      newState.getZNodeVersion());
+                }
+              }
+            }
+            stateHasChanged.set(oldState != watch.currentState);
+            return watch;
+          });
+
+      return stateHasChanged.get();
+    }
+
+    /**
+     * Computes the new StatefulCollectionWatch by the supplied remappingFunction.
+     *
+     * @param collectionName collection name
+     * @param remappingFunction remaps the StatefulCollectionWatch. If this returns null, the
+     *     associated StatefulCollectionWatch will be removed; otherwise, the returned value will be
+     *     assigned to such collection
+     * @return the new StatefulCollectionWatch associated with the collection
+     * @see ConcurrentHashMap#compute(Object, BiFunction)
+     */
+    private StatefulCollectionWatch compute(
+        String collectionName,
+        BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
+      return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
+    }
+  }
+
+  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+    private DocCollection currentState;
+  }
+
   public static final Set<String> KNOWN_CLUSTER_PROPS =
       Set.of(
           URL_SCHEME,
@@ -326,12 +460,17 @@ public class ZkStateReader implements SolrCloseable {
       // No need to set watchers because we should already have watchers registered for everything.
       refreshCollectionList(null);
       refreshLiveNodes(null);
-      // Need a copy so we don't delete from what we're iterating over.
-      Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
+
       Set<String> updatedCollections = new HashSet<>();
-      for (String coll : safeCopy) {
+
+      // Iterate through the actively watched collections. Take note that the returned watched
+      // collections might change during the iteration, but it should not throw exception as
+      // it's thread-safe.
+      // If such set is modified elsewhere during the iteration, the code logic should still
+      // handle such missing/extra collection w/o issues.
+      for (String coll : collectionWatches.watchedCollections()) {
         DocCollection newState = fetchCollectionState(coll, null);
-        if (updateWatchedCollection(coll, newState)) {
+        if (collectionWatches.updateDocCollection(coll, newState)) {
           updatedCollections.add(coll);
         }
       }
@@ -370,11 +509,11 @@ public class ZkStateReader implements SolrCloseable {
         if (ref.get() != null) {
           return;
         }
-      } else if (watchedCollectionStates.containsKey(collection)) {
+      } else if (collectionWatches.watchedCollections().contains(collection)) {
         // Exists as a watched collection, force a refresh.
         log.debug("Forcing refresh of watched collection state for {}", collection);
         DocCollection newState = fetchCollectionState(collection, null);
-        if (updateWatchedCollection(collection, newState)) {
+        if (collectionWatches.updateDocCollection(collection, newState)) {
           constructState(Collections.singleton(collection));
         }
       } else {
@@ -398,7 +537,7 @@ public class ZkStateReader implements SolrCloseable {
       DocCollection nu = getCollectionLive(coll);
       if (nu == null) return -1;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
-        if (updateWatchedCollection(coll, nu)) {
+        if (collectionWatches.updateDocCollection(coll, nu)) {
           synchronized (getUpdateLock()) {
             constructState(Collections.singleton(coll));
           }
@@ -525,8 +664,13 @@ public class ZkStateReader implements SolrCloseable {
     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
 
     // Add collections
-    for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
-      result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+    for (Entry<String, StatefulCollectionWatch> entry :
+        collectionWatches.watchedCollectionEntries()) {
+      if (entry.getValue().currentState != null) {
+        // if the doc is null for the collection watch, then it should not be inserted into the
+        // state
+        result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue().currentState));
+      }
     }
 
     // Finally, add any lazy collections that aren't already accounted for.
@@ -539,8 +683,8 @@ public class ZkStateReader implements SolrCloseable {
     if (log.isDebugEnabled()) {
       log.debug(
           "clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
-          collectionWatches.keySet().size(),
-          watchedCollectionStates.keySet().size(),
+          collectionWatches.watchedCollections().size(),
+          collectionWatches.activeCollectionCount(),
           lazyCollectionStates.keySet().size(),
           clusterState.getCollectionStates().size());
     }
@@ -548,8 +692,8 @@ public class ZkStateReader implements SolrCloseable {
     if (log.isTraceEnabled()) {
       log.trace(
           "clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
-          collectionWatches.keySet(),
-          watchedCollectionStates.keySet(),
+          collectionWatches.watchedCollections(),
+          collectionWatches.activeCollections(),
           lazyCollectionStates.keySet(),
           clusterState.getCollectionStates());
     }
@@ -563,7 +707,7 @@ public class ZkStateReader implements SolrCloseable {
 
   /** Refresh collections. */
   private void refreshCollections() {
-    for (String coll : collectionWatches.keySet()) {
+    for (String coll : collectionWatches.watchedCollections()) {
       new StateWatcher(coll).refreshAndWatch();
     }
   }
@@ -593,7 +737,7 @@ public class ZkStateReader implements SolrCloseable {
       this.lazyCollectionStates.keySet().retainAll(children);
       for (String coll : children) {
         // We will create an eager collection for any interesting collections, so don't add to lazy.
-        if (!collectionWatches.containsKey(coll)) {
+        if (!collectionWatches.watchedCollections().contains(coll)) {
           // Double check contains just to avoid allocating an object.
           LazyCollectionRef existing = lazyCollectionStates.get(coll);
           if (existing == null) {
@@ -642,9 +786,9 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  private Set<String> getCurrentCollections() {
+  public Set<String> getCurrentCollections() {
     Set<String> collections = new HashSet<>();
-    collections.addAll(watchedCollectionStates.keySet());
+    collections.addAll(collectionWatches.activeCollections());
     collections.addAll(lazyCollectionStates.keySet());
     return collections;
   }
@@ -1270,7 +1414,7 @@ public class ZkStateReader implements SolrCloseable {
         return;
       }
 
-      if (!collectionWatches.containsKey(coll)) {
+      if (!collectionWatches.watchedCollections().contains(coll)) {
         // This collection is no longer interesting, stop watching.
         log.debug("Uninteresting collection {}", coll);
         return;
@@ -1294,8 +1438,8 @@ public class ZkStateReader implements SolrCloseable {
 
     /**
      * Refresh collection state from ZK and leave a watch for future changes. As a side effect,
-     * updates {@link #clusterState} and {@link #watchedCollectionStates} with the results of the
-     * refresh.
+     * updates {@link #clusterState} and collection ref within {@link #collectionWatches} with the
+     * results of the refresh.
      */
     public void refreshAndWatch(EventType eventType) {
       try {
@@ -1308,7 +1452,7 @@ public class ZkStateReader implements SolrCloseable {
         }
 
         DocCollection newState = fetchCollectionState(coll, this);
-        updateWatchedCollection(coll, newState);
+        collectionWatches.updateDocCollection(coll, newState);
         synchronized (getUpdateLock()) {
           constructState(Collections.singleton(coll));
         }
@@ -1332,10 +1476,10 @@ public class ZkStateReader implements SolrCloseable {
         replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
         PerReplicaStates newStates =
             new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
-        DocCollection oldState = watchedCollectionStates.get(coll);
+        DocCollection oldState = collectionWatches.getDocCollection(coll);
         final DocCollection newState =
             oldState != null ? oldState.copyWith(newStates) : fetchCollectionState(coll, null);
-        updateWatchedCollection(coll, newState);
+        collectionWatches.updateDocCollection(coll, newState);
         synchronized (getUpdateLock()) {
           constructState(Collections.singleton(coll));
         }
@@ -1612,7 +1756,7 @@ public class ZkStateReader implements SolrCloseable {
         (k, v) -> {
           if (v == null) {
             reconstructState.set(true);
-            v = new CollectionWatch<>();
+            v = new StatefulCollectionWatch();
           }
           v.coreRefCount++;
           return v;
@@ -1640,7 +1784,6 @@ public class ZkStateReader implements SolrCloseable {
           if (v == null) return null;
           if (v.coreRefCount > 0) v.coreRefCount--;
           if (v.canBeRemoved()) {
-            watchedCollectionStates.remove(collection);
             lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
             reconstructState.set(true);
             return null;
@@ -1697,7 +1840,7 @@ public class ZkStateReader implements SolrCloseable {
         collection,
         (k, v) -> {
           if (v == null) {
-            v = new CollectionWatch<>();
+            v = new StatefulCollectionWatch();
             watchSet.set(true);
           }
           v.stateWatchers.add(stateWatcher);
@@ -1724,7 +1867,7 @@ public class ZkStateReader implements SolrCloseable {
         log.debug("update for a fresh per-replica-state {}", c.getName());
       }
       DocCollection modifiedColl = c.copyWith(newPrs);
-      updateWatchedCollection(c.getName(), modifiedColl);
+      collectionWatches.updateDocCollection(c.getName(), modifiedColl);
       return modifiedColl;
     } else {
       return c;
@@ -1919,9 +2062,9 @@ public class ZkStateReader implements SolrCloseable {
           if (v == null) return null;
           v.stateWatchers.remove(watcher);
           if (v.canBeRemoved()) {
-            watchedCollectionStates.remove(collection);
             lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
             reconstructState.set(true);
+            assert CommonTestInjection.injectDelay(); // To unit test race condition
             return null;
           }
           return v;
@@ -1947,65 +2090,6 @@ public class ZkStateReader implements SolrCloseable {
     return watchers;
   }
 
-  // returns true if the state has changed
-  private boolean updateWatchedCollection(String coll, DocCollection newState) {
-
-    if (newState == null) {
-      log.debug("Removing cached collection state for [{}]", coll);
-      watchedCollectionStates.remove(coll);
-      return true;
-    }
-
-    boolean updated = false;
-    // CAS update loop
-    while (true) {
-      if (!collectionWatches.containsKey(coll)) {
-        break;
-      }
-      DocCollection oldState = watchedCollectionStates.get(coll);
-      if (oldState == null) {
-        if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
-          if (log.isDebugEnabled()) {
-            log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
-          }
-          updated = true;
-          break;
-        }
-      } else {
-        int oldCVersion =
-            oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
-        int newCVersion =
-            newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
-        if (oldState.getZNodeVersion() >= newState.getZNodeVersion()
-            && oldCVersion >= newCVersion) {
-          // no change to state, but we might have been triggered by the addition of a
-          // state watcher, so run notifications
-          updated = true;
-          break;
-        }
-        if (watchedCollectionStates.replace(coll, oldState, newState)) {
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "Updating data for [{}] from [{}] to [{}]",
-                coll,
-                oldState.getZNodeVersion(),
-                newState.getZNodeVersion());
-          }
-          updated = true;
-          break;
-        }
-      }
-    }
-
-    // Resolve race with unregisterCore.
-    if (!collectionWatches.containsKey(coll)) {
-      watchedCollectionStates.remove(coll);
-      log.debug("Removing uninteresting collection [{}]", coll);
-    }
-
-    return updated;
-  }
-
   public void registerCollectionPropsWatcher(
       final String collection, CollectionPropsWatcher propsWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
index c837b895049..e1d7caa8dd9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
@@ -17,7 +17,10 @@
 
 package org.apache.solr.common.util;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Allows random faults to be injected in running code during test runs across all solr packages.
@@ -25,11 +28,14 @@ import java.util.Map;
  * @lucene.internal
  */
 public class CommonTestInjection {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static volatile Map<String, String> additionalSystemProps = null;
+  private static volatile Integer delay = null;
 
   public static void reset() {
     additionalSystemProps = null;
+    delay = null;
   }
 
   public static void setAdditionalProps(Map<String, String> additionalSystemProps) {
@@ -39,4 +45,32 @@ public class CommonTestInjection {
   public static Map<String, String> injectAdditionalProps() {
     return additionalSystemProps;
   }
+
+  /**
+   * Set test delay (sleep) in unit of millisec
+   *
+   * @param delay delay in millisec, null to remove such delay
+   */
+  public static void setDelay(Integer delay) {
+    CommonTestInjection.delay = delay;
+  }
+
+  /**
+   * Inject an artificial delay(sleep) into the code
+   *
+   * @return true
+   */
+  public static boolean injectDelay() {
+    if (delay != null) {
+      try {
+        log.info("Start: artificial delay for {}ms", delay);
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        log.info("Finish: artificial delay for {}ms", delay);
+      }
+    }
+    return true;
+  }
 }