You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@solr.apache.org by GitBox <gi...@apache.org> on 2022/06/29 22:44:15 UTC

[GitHub] [solr] madrob commented on a diff in pull request #909: SOLR-16257: `ZkStateReader` changes to avoid race condition between `collectionWatches` and `watchedCollectionStates`

madrob commented on code in PR #909:
URL: https://github.com/apache/solr/pull/909#discussion_r910461025


##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());

Review Comment:
   Can we pull this into a common setup method so the logic isn't repeated in every single test method?



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+      // create new collection
+      DocCollection state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(0, ref.get().getZNodeVersion());
+
+      // update the collection
+      state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              ref.get().getZNodeVersion());
+      wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(1, ref.get().getZNodeVersion());
+
+      // delete the collection c1, add a collection c2 that is NOT watched
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      state =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNull(ref);
+
+      ref = reader.getClusterState().getCollectionRef("c2");
+      assertNotNull(ref);
+      assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+      assertEquals(0, ref.get().getZNodeVersion());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testGetCurrentCollections() throws Exception {
+    Path zkDir = createTempDir("testGetCurrentCollections");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // listen to c1. not yet exist
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.forceUpdateCollection("c1");
+      Set<String> currentCollections = reader.getCurrentCollections();
+      assertEquals(0, currentCollections.size()); // no active collections yet
+
+      // now create both c1 (watched) and c2 (not watched)
+      DocCollection state1 =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+      DocCollection state2 =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+
+      // do not listen to c2
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forceUpdateCollection("c1");
+      reader.forceUpdateCollection("c2");
+      currentCollections =
+          reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
+      // loaded)
+      assertEquals(2, currentCollections.size());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testWatchRaceCondition() throws Exception {
+    final int RUN_COUNT = 10000;
+    Path zkDir = createTempDir("testWatchRaceCondition");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      final ZkStateReader readerRef = reader;
+      reader.createClusterStateWatchersAndUpdate();
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+      final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      final AtomicInteger updateCounts = new AtomicInteger(0);
+      final AtomicReference<Exception> updateException = new AtomicReference<>();
+      executorService.submit(
+          () -> {
+            try {
+              ClusterState clusterState = readerRef.getClusterState();
+              while (!stopMutatingThread.get()) {
+                DocCollection collection = clusterState.getCollectionOrNull("c1");
+                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+                // create new collection
+                DocCollection state =
+                    new DocCollection(
+                        "c1",
+                        new HashMap<>(),
+                        Map.of(
+                            ZkStateReader.CONFIGNAME_PROP,
+                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                        DocRouter.DEFAULT,
+                        currentVersion);
+                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+                clusterState = writer.writePendingUpdates();
+              }
+            } catch (Exception e) {
+              updateException.set(e);
+            }
+            return null;
+          });
+      executorService.shutdown();
+
+      for (int i = 0; i < RUN_COUNT; i++) {
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        // remove itself on 2nd trigger
+        DocCollectionWatcher dummyWatcher =
+            collection -> {
+              latch.countDown();
+              return latch.getCount() == 0;
+            };
+        reader.registerDocCollectionWatcher("c1", dummyWatcher);
+        latch.await(10, TimeUnit.SECONDS);
+        reader.removeDocCollectionWatcher("c1", dummyWatcher);
+
+        boolean refLazilyLoaded = false;
+        for (int j = 0; j < 10; j++) {
+          if (reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()) {
+            refLazilyLoaded = true; // it should eventually be lazily loaded
+            break;
+          }
+          int attempt = j + 1;
+          log.info("ref is not lazily loaded yet. Attempt : {}", attempt);
+
+          TimeUnit.MILLISECONDS.sleep(100);
+        }

Review Comment:
   TimeOut.waitFor



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+      // create new collection
+      DocCollection state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(0, ref.get().getZNodeVersion());
+
+      // update the collection
+      state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              ref.get().getZNodeVersion());
+      wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(1, ref.get().getZNodeVersion());
+
+      // delete the collection c1, add a collection c2 that is NOT watched
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      state =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNull(ref);
+
+      ref = reader.getClusterState().getCollectionRef("c2");
+      assertNotNull(ref);
+      assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+      assertEquals(0, ref.get().getZNodeVersion());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testGetCurrentCollections() throws Exception {
+    Path zkDir = createTempDir("testGetCurrentCollections");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // listen to c1. not yet exist
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.forceUpdateCollection("c1");
+      Set<String> currentCollections = reader.getCurrentCollections();
+      assertEquals(0, currentCollections.size()); // no active collections yet
+
+      // now create both c1 (watched) and c2 (not watched)
+      DocCollection state1 =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+      DocCollection state2 =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+
+      // do not listen to c2
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forceUpdateCollection("c1");
+      reader.forceUpdateCollection("c2");
+      currentCollections =
+          reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
+      // loaded)
+      assertEquals(2, currentCollections.size());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testWatchRaceCondition() throws Exception {
+    final int RUN_COUNT = 10000;
+    Path zkDir = createTempDir("testWatchRaceCondition");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      final ZkStateReader readerRef = reader;
+      reader.createClusterStateWatchersAndUpdate();
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+      final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      final AtomicInteger updateCounts = new AtomicInteger(0);

Review Comment:
   unused?



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+      // create new collection
+      DocCollection state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(0, ref.get().getZNodeVersion());
+
+      // update the collection
+      state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              ref.get().getZNodeVersion());
+      wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(1, ref.get().getZNodeVersion());
+
+      // delete the collection c1, add a collection c2 that is NOT watched
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      state =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNull(ref);
+
+      ref = reader.getClusterState().getCollectionRef("c2");
+      assertNotNull(ref);
+      assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+      assertEquals(0, ref.get().getZNodeVersion());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testGetCurrentCollections() throws Exception {
+    Path zkDir = createTempDir("testGetCurrentCollections");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // listen to c1. not yet exist
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.forceUpdateCollection("c1");
+      Set<String> currentCollections = reader.getCurrentCollections();
+      assertEquals(0, currentCollections.size()); // no active collections yet
+
+      // now create both c1 (watched) and c2 (not watched)
+      DocCollection state1 =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+      DocCollection state2 =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+
+      // do not listen to c2
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forceUpdateCollection("c1");
+      reader.forceUpdateCollection("c2");
+      currentCollections =
+          reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
+      // loaded)
+      assertEquals(2, currentCollections.size());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testWatchRaceCondition() throws Exception {
+    final int RUN_COUNT = 10000;
+    Path zkDir = createTempDir("testWatchRaceCondition");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      final ZkStateReader readerRef = reader;
+      reader.createClusterStateWatchersAndUpdate();
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+      final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      final AtomicInteger updateCounts = new AtomicInteger(0);
+      final AtomicReference<Exception> updateException = new AtomicReference<>();
+      executorService.submit(
+          () -> {
+            try {
+              ClusterState clusterState = readerRef.getClusterState();
+              while (!stopMutatingThread.get()) {
+                DocCollection collection = clusterState.getCollectionOrNull("c1");
+                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+                // create new collection
+                DocCollection state =
+                    new DocCollection(
+                        "c1",
+                        new HashMap<>(),
+                        Map.of(
+                            ZkStateReader.CONFIGNAME_PROP,
+                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                        DocRouter.DEFAULT,
+                        currentVersion);
+                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+                clusterState = writer.writePendingUpdates();
+              }
+            } catch (Exception e) {
+              updateException.set(e);
+            }
+            return null;
+          });
+      executorService.shutdown();
+
+      for (int i = 0; i < RUN_COUNT; i++) {
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        // remove itself on 2nd trigger
+        DocCollectionWatcher dummyWatcher =
+            collection -> {
+              latch.countDown();
+              return latch.getCount() == 0;
+            };
+        reader.registerDocCollectionWatcher("c1", dummyWatcher);
+        latch.await(10, TimeUnit.SECONDS);

Review Comment:
   need to check return value here



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -540,7 +627,7 @@ private void constructState(Set<String> changedCollections) {
       log.debug(
           "clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
           collectionWatches.keySet().size(),
-          watchedCollectionStates.keySet().size(),
+          collectionWatches.activeCollections().size(),

Review Comment:
   I would create a specialized method for this that doesn't involve collecting into an intermediate set.



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+      // create new collection
+      DocCollection state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(0, ref.get().getZNodeVersion());
+
+      // update the collection
+      state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              ref.get().getZNodeVersion());
+      wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(1, ref.get().getZNodeVersion());
+
+      // delete the collection c1, add a collection c2 that is NOT watched
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      state =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNull(ref);
+
+      ref = reader.getClusterState().getCollectionRef("c2");
+      assertNotNull(ref);
+      assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+      assertEquals(0, ref.get().getZNodeVersion());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testGetCurrentCollections() throws Exception {
+    Path zkDir = createTempDir("testGetCurrentCollections");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // listen to c1. not yet exist
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.forceUpdateCollection("c1");
+      Set<String> currentCollections = reader.getCurrentCollections();
+      assertEquals(0, currentCollections.size()); // no active collections yet
+
+      // now create both c1 (watched) and c2 (not watched)
+      DocCollection state1 =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+      DocCollection state2 =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+
+      // do not listen to c2
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forceUpdateCollection("c1");
+      reader.forceUpdateCollection("c2");
+      currentCollections =
+          reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
+      // loaded)
+      assertEquals(2, currentCollections.size());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testWatchRaceCondition() throws Exception {
+    final int RUN_COUNT = 10000;
+    Path zkDir = createTempDir("testWatchRaceCondition");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      final ZkStateReader readerRef = reader;
+      reader.createClusterStateWatchersAndUpdate();
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+      final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      final AtomicInteger updateCounts = new AtomicInteger(0);
+      final AtomicReference<Exception> updateException = new AtomicReference<>();
+      executorService.submit(
+          () -> {
+            try {
+              ClusterState clusterState = readerRef.getClusterState();
+              while (!stopMutatingThread.get()) {
+                DocCollection collection = clusterState.getCollectionOrNull("c1");
+                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+                // create new collection
+                DocCollection state =
+                    new DocCollection(
+                        "c1",
+                        new HashMap<>(),
+                        Map.of(
+                            ZkStateReader.CONFIGNAME_PROP,
+                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                        DocRouter.DEFAULT,
+                        currentVersion);
+                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+                clusterState = writer.writePendingUpdates();
+              }
+            } catch (Exception e) {
+              updateException.set(e);
+            }
+            return null;
+          });
+      executorService.shutdown();
+
+      for (int i = 0; i < RUN_COUNT; i++) {
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        // remove itself on 2nd trigger
+        DocCollectionWatcher dummyWatcher =
+            collection -> {
+              latch.countDown();
+              return latch.getCount() == 0;
+            };
+        reader.registerDocCollectionWatcher("c1", dummyWatcher);
+        latch.await(10, TimeUnit.SECONDS);
+        reader.removeDocCollectionWatcher("c1", dummyWatcher);
+
+        boolean refLazilyLoaded = false;
+        for (int j = 0; j < 10; j++) {
+          if (reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()) {
+            refLazilyLoaded = true; // it should eventually be lazily loaded
+            break;
+          }
+          int attempt = j + 1;
+          log.info("ref is not lazily loaded yet. Attempt : {}", attempt);
+
+          TimeUnit.MILLISECONDS.sleep(100);
+        }
+        assert (refLazilyLoaded);

Review Comment:
   junit assert



##########
solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java:
##########
@@ -196,4 +209,251 @@ public void testWatchedCollectionCreation() throws Exception {
       server.shutdown();
     }
   }
+
+  public void testForciblyRefreshAllClusterState() throws Exception {
+    Path zkDir = createTempDir("testForciblyRefreshAllClusterState");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+      // create new collection
+      DocCollection state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(0, ref.get().getZNodeVersion());
+
+      // update the collection
+      state =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              ref.get().getZNodeVersion());
+      wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(1, ref.get().getZNodeVersion());
+
+      // delete the collection c1, add a collection c2 that is NOT watched
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      state =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forciblyRefreshAllClusterStateSlow();
+      ref = reader.getClusterState().getCollectionRef("c1");
+      assertNull(ref);
+
+      ref = reader.getClusterState().getCollectionRef("c2");
+      assertNotNull(ref);
+      assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+      assertEquals(0, ref.get().getZNodeVersion());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testGetCurrentCollections() throws Exception {
+    Path zkDir = createTempDir("testGetCurrentCollections");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.registerCore("c1"); // listen to c1. not yet exist
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.forceUpdateCollection("c1");
+      Set<String> currentCollections = reader.getCurrentCollections();
+      assertEquals(0, currentCollections.size()); // no active collections yet
+
+      // now create both c1 (watched) and c2 (not watched)
+      DocCollection state1 =
+          new DocCollection(
+              "c1",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+      ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+      DocCollection state2 =
+          new DocCollection(
+              "c2",
+              new HashMap<>(),
+              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+              DocRouter.DEFAULT,
+              0);
+
+      // do not listen to c2
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+      ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+      writer.writePendingUpdates();
+
+      reader.forceUpdateCollection("c1");
+      reader.forceUpdateCollection("c2");
+      currentCollections =
+          reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
+      // loaded)
+      assertEquals(2, currentCollections.size());
+    } finally {
+      IOUtils.close(reader, zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testWatchRaceCondition() throws Exception {
+    final int RUN_COUNT = 10000;
+    Path zkDir = createTempDir("testWatchRaceCondition");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    ExecutorService executorService =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(
+            new SolrNamedThreadFactory("zkStateReaderTest"));
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      reader = new ZkStateReader(zkClient);
+      final ZkStateReader readerRef = reader;
+      reader.createClusterStateWatchersAndUpdate();
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // start another thread to constantly updating the state
+      final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+      final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+      final AtomicInteger updateCounts = new AtomicInteger(0);
+      final AtomicReference<Exception> updateException = new AtomicReference<>();
+      executorService.submit(
+          () -> {
+            try {
+              ClusterState clusterState = readerRef.getClusterState();
+              while (!stopMutatingThread.get()) {
+                DocCollection collection = clusterState.getCollectionOrNull("c1");
+                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+                // create new collection
+                DocCollection state =
+                    new DocCollection(
+                        "c1",
+                        new HashMap<>(),
+                        Map.of(
+                            ZkStateReader.CONFIGNAME_PROP,
+                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+                        DocRouter.DEFAULT,
+                        currentVersion);
+                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+                clusterState = writer.writePendingUpdates();
+              }
+            } catch (Exception e) {
+              updateException.set(e);
+            }
+            return null;
+          });
+      executorService.shutdown();
+
+      for (int i = 0; i < RUN_COUNT; i++) {
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        // remove itself on 2nd trigger
+        DocCollectionWatcher dummyWatcher =
+            collection -> {
+              latch.countDown();
+              return latch.getCount() == 0;
+            };
+        reader.registerDocCollectionWatcher("c1", dummyWatcher);
+        latch.await(10, TimeUnit.SECONDS);
+        reader.removeDocCollectionWatcher("c1", dummyWatcher);
+
+        boolean refLazilyLoaded = false;
+        for (int j = 0; j < 10; j++) {
+          if (reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()) {
+            refLazilyLoaded = true; // it should eventually be lazily loaded
+            break;
+          }
+          int attempt = j + 1;
+          log.info("ref is not lazily loaded yet. Attempt : {}", attempt);
+
+          TimeUnit.MILLISECONDS.sleep(100);
+        }
+        assert (refLazilyLoaded);
+      }
+
+      stopMutatingThread.set(true);
+      if (updateException.get() != null) {
+        throw (updateException.get());
+      }
+
+    } finally {
+      IOUtils.close(reader, zkClient);
+      executorService.awaitTermination(10, TimeUnit.SECONDS);

Review Comment:
   ExecutorUtil.awaitTermination



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -1640,7 +1727,6 @@ public void unregisterCore(String collection) {
           if (v == null) return null;
           if (v.coreRefCount > 0) v.coreRefCount--;
           if (v.canBeRemoved()) {
-            watchedCollectionStates.remove(collection);
             lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
             reconstructState.set(true);
             return null;

Review Comment:
   it removes the entry



##########
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -246,6 +244,90 @@ public boolean canBeRemoved() {
     }
   }
 
+  /**
+   * A ConcurrentHashMap of active watcher by collection name
+   *
+   * <p>Each watcher DocCollectionWatch also contains the latest DocCollection (state) observed
+   */
+  private static class DocCollectionWatches
+      extends ConcurrentHashMap<String, DocCollectionWatch<DocCollectionWatcher>> {

Review Comment:
   extending CHM can get kind of trappy, it's usually a better idea to encapsulate a CHM as a field inside of this class and then create delegate methods where necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org