You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2022/07/20 19:01:13 UTC
[solr] branch main updated: SOLR-16257: Improve ZkStateReader to avoid race condition (#909)
This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 97458a503bc SOLR-16257: Improve ZkStateReader to avoid race condition (#909)
97458a503bc is described below
commit 97458a503bc0e5a4f52c0dc4b211d4d79cd1bccd
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Wed Jul 20 12:01:08 2022 -0700
SOLR-16257: Improve ZkStateReader to avoid race condition (#909)
Race condition existed between collectionWatches and watchedCollectionStates.
---
solr/CHANGES.txt | 3 +
.../solr/cloud/overseer/ZkStateReaderTest.java | 613 ++++++++++++++++-----
.../apache/solr/common/cloud/ZkStateReader.java | 272 +++++----
.../solr/common/util/CommonTestInjection.java | 34 ++
4 files changed, 696 insertions(+), 226 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 777facf0372..e18d37d7492 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -59,6 +59,9 @@ Improvements
* SOLR-16225: Upgrade dependencies (Carrot2, HPPC) (Dawid Weiss)
+* SOLR-16257: Improve ZkStateReader to avoid race condition between collectionWatches and watchedCollectionStates
+ (Patson Luk, Houston Putman, Mike Drob)
+
Optimizations
---------------------
* SOLR-16120: Optimise hl.fl expansion. (Christine Poerschke, David Smiley, Mike Drob)
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 639c7538a46..ac34bfaa1ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -16,11 +16,20 @@
*/
package org.apache.solr.cloud.overseer;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
@@ -29,171 +38,511 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.CommonTestInjection;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkStateReaderTest extends SolrTestCaseJ4 {
-
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long TIMEOUT = 30;
- public void testExternalCollectionWatchedNotWatched() throws Exception {
- Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
- ZkTestServer server = new ZkTestServer(zkDir);
- SolrZkClient zkClient = null;
- ZkStateReader reader = null;
-
- try {
- server.run();
-
- zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
- ZkController.createClusterZkNodes(zkClient);
-
- reader = new ZkStateReader(zkClient);
- reader.createClusterStateWatchersAndUpdate();
-
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
- // create new collection
- ZkWriteCommand c1 =
- new ZkWriteCommand(
- "c1",
- new DocCollection(
- "c1",
- new HashMap<>(),
- Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
- DocRouter.DEFAULT,
- 0));
- writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
- writer.writePendingUpdates();
- reader.forceUpdateCollection("c1");
-
- assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
- reader.registerCore("c1");
- assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
- reader.unregisterCore("c1");
- assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+ private static class TestFixture implements Closeable {
+ private final ZkTestServer server;
+ private final SolrZkClient zkClient;
+ private final ZkStateReader reader;
+ private final ZkStateWriter writer;
+
+ private TestFixture(
+ ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) {
+ this.server = server;
+ this.zkClient = zkClient;
+ this.reader = reader;
+ this.writer = writer;
+ }
- } finally {
+ @Override
+ public void close() throws IOException {
IOUtils.close(reader, zkClient);
- server.shutdown();
+ try {
+ server.shutdown();
+ } catch (InterruptedException e) {
+ // ok. Shutting down anyway
+ }
}
}
- public void testCollectionStateWatcherCaching() throws Exception {
- Path zkDir = createTempDir("testCollectionStateWatcherCaching");
-
- ZkTestServer server = new ZkTestServer(zkDir);
+ private TestFixture fixture = null;
- SolrZkClient zkClient = null;
- ZkStateReader reader = null;
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ fixture = setupTestFixture(getTestName());
+ }
- try {
- server.run();
-
- zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
- ZkController.createClusterZkNodes(zkClient);
-
- reader = new ZkStateReader(zkClient);
- reader.createClusterStateWatchersAndUpdate();
-
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
- DocCollection state =
- new DocCollection(
- "c1",
- new HashMap<>(),
- Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
- DocRouter.DEFAULT,
- 0);
- ZkWriteCommand wc = new ZkWriteCommand("c1", state);
- writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
- writer.writePendingUpdates();
- assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
- reader.waitForState(
- "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
-
- Map<String, Object> props = new HashMap<>();
- props.put("x", "y");
- props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
- state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
- wc = new ZkWriteCommand("c1", state);
- writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
- writer.writePendingUpdates();
-
- boolean found = false;
- TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while (!timeOut.hasTimedOut()) {
- DocCollection c1 = reader.getClusterState().getCollection("c1");
- if ("y".equals(c1.getStr("x"))) {
- found = true;
- break;
- }
- }
- assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
- } finally {
- IOUtils.close(reader, zkClient);
- server.shutdown();
+ @After
+ public void tearDown() throws Exception {
+ if (fixture != null) {
+ fixture.close();
}
+ super.tearDown();
}
- public void testWatchedCollectionCreation() throws Exception {
- Path zkDir = createTempDir("testWatchedCollectionCreation");
-
+ private static TestFixture setupTestFixture(String testPrefix) throws Exception {
+ Path zkDir = createTempDir(testPrefix);
ZkTestServer server = new ZkTestServer(zkDir);
+ server.run();
+ SolrZkClient zkClient =
+ new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
- SolrZkClient zkClient = null;
- ZkStateReader reader = null;
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
- try {
- server.run();
+ ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
- zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
- ZkController.createClusterZkNodes(zkClient);
+ return new TestFixture(server, zkClient, reader, writer);
+ }
- reader = new ZkStateReader(zkClient);
- reader.createClusterStateWatchersAndUpdate();
- reader.registerCore("c1");
+ public void testExternalCollectionWatchedNotWatched() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ // create new collection
+ ZkWriteCommand c1 =
+ new ZkWriteCommand(
+ "c1",
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0));
+
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+ writer.writePendingUpdates();
+ reader.forceUpdateCollection("c1");
+
+ assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+ reader.registerCore("c1");
+ assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+ reader.unregisterCore("c1");
+ assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+ }
- // Initially there should be no c1 collection.
- assertNull(reader.getClusterState().getCollectionRef("c1"));
+ public void testCollectionStateWatcherCaching() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+ assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+ reader.waitForState(
+ "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
+
+ Map<String, Object> props = new HashMap<>();
+ props.put("x", "y");
+ props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
+ state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
+ wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ boolean found = false;
+ TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (!timeOut.hasTimedOut()) {
+ DocCollection c1 = reader.getClusterState().getCollection("c1");
+ if ("y".equals(c1.getStr("x"))) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
+ }
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
- reader.forceUpdateCollection("c1");
+ public void testWatchedCollectionCreation() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ reader.registerCore("c1");
+
+ // Initially there should be no c1 collection.
+ assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ reader.forceUpdateCollection("c1");
+
+ // Still no c1 collection, despite a collection path.
+ assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+ // create new collection
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+ // reader.forceUpdateCollection("c1");
+ reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
+ ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+ assertNotNull(ref);
+ assertFalse(ref.isLazilyLoaded());
+ }
- // Still no c1 collection, despite a collection path.
- assertNull(reader.getClusterState().getCollectionRef("c1"));
+ /**
+ * Verifies that znode and child versions are correct and version changes trigger cluster state
+ * updates
+ */
+ public void testNodeVersion() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ ClusterState clusterState = reader.getClusterState();
+ // create new collection
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+
+ // have to register it here after the updates, otherwise the child node watch will not be
+ // inserted
+ reader.registerCore("c1");
+
+ TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ timeOut.waitFor(
+ "Timeout on waiting for c1 to show up in cluster state",
+ () ->
+ reader.getClusterState().getCollectionRef("c1") != null
+ && reader.getClusterState().getCollectionRef("c1").get() != null);
+
+ ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(0, ref.get().getZNodeVersion());
+ assertEquals(-1, ref.get().getChildNodesVersion());
+
+ DocCollection collection = ref.get();
+ PerReplicaStates prs =
+ PerReplicaStates.fetch(
+ collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
+ PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
+ .persist(collection.getZNode(), fixture.zkClient);
+ timeOut.waitFor(
+ "Timeout on waiting for c1 updated to have PRS state r1",
+ () -> {
+ DocCollection c = reader.getCollection("c1");
+ return c.getPerReplicaStates() != null
+ && c.getPerReplicaStates().get("r1") != null
+ && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
+ });
+
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
+ assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now
+
+ prs = ref.get().getPerReplicaStates();
+ PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
+ .persist(collection.getZNode(), fixture.zkClient);
+ timeOut.waitFor(
+ "Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN",
+ () ->
+ reader.getCollection("c1").getPerReplicaStates().get("r1").state
+ == Replica.State.ACTIVE);
+
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
+ // but child version should be 3 now (1 del + 1 add)
+ assertEquals(3, ref.get().getChildNodesVersion());
+
+ // now delete the collection
+ wc = new ZkWriteCommand("c1", null);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+ timeOut.waitFor(
+ "Timeout on waiting for c1 to be removed from cluster state",
+ () -> reader.getClusterState().getCollectionRef("c1").get() == null);
+
+ reader.unregisterCore("c1");
+ // re-add the same collection
+ wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+ // re-register, otherwise the child watch would be missing from collection deletion
+ reader.registerCore("c1");
+
+ // reader.forceUpdateCollection("c1");
+ timeOut.waitFor(
+ "Timeout on waiting for c1 to show up in cluster state again",
+ () ->
+ reader.getClusterState().getCollectionRef("c1") != null
+ && reader.getClusterState().getCollectionRef("c1").get() != null);
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(0, ref.get().getZNodeVersion());
+ assertEquals(-1, ref.get().getChildNodesVersion()); // child node version is reset
+
+ // re-add PRS
+ collection = ref.get();
+ prs =
+ PerReplicaStates.fetch(
+ collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
+ PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
+ .persist(collection.getZNode(), fixture.zkClient);
+ timeOut.waitFor(
+ "Timeout on waiting for c1 updated to have PRS state r1",
+ () -> {
+ DocCollection c = reader.getCollection("c1");
+ return c.getPerReplicaStates() != null
+ && c.getPerReplicaStates().get("r1") != null
+ && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
+ });
+
+ ref = reader.getClusterState().getCollectionRef("c1");
+
+ // child version should be reset since the state.json node was deleted and re-created
+ assertEquals(1, ref.get().getChildNodesVersion());
+ }
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ public void testForciblyRefreshAllClusterState() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ reader.registerCore("c1"); // watching c1, so it should get non lazy reference
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ // Initially there should be no c1 collection.
+ assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+ // create new collection
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+ assertNotNull(ref);
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(0, ref.get().getZNodeVersion());
+
+ // update the collection
+ state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ ref.get().getZNodeVersion());
+ wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertNotNull(ref);
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(1, ref.get().getZNodeVersion());
+
+ // delete the collection c1, add a collection c2 that is NOT watched
+ ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ state =
+ new DocCollection(
+ "c2",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+ writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+ writer.writePendingUpdates();
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertNull(ref);
+
+ ref = reader.getClusterState().getCollectionRef("c2");
+ assertNotNull(ref);
+ assertTrue(
+ "c2 should have been lazily loaded but is not!",
+ ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
+ assertEquals(0, ref.get().getZNodeVersion());
+ }
- // create new collection
- DocCollection state =
- new DocCollection(
- "c1",
- new HashMap<>(),
- Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
- DocRouter.DEFAULT,
- 0);
- ZkWriteCommand wc = new ZkWriteCommand("c1", state);
- writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
- writer.writePendingUpdates();
+ public void testGetCurrentCollections() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ reader.registerCore("c1"); // listen to c1. not yet exist
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ reader.forceUpdateCollection("c1");
+ Set<String> currentCollections = reader.getCurrentCollections();
+ assertEquals(0, currentCollections.size()); // no active collections yet
+
+ // now create both c1 (watched) and c2 (not watched)
+ DocCollection state1 =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
+ DocCollection state2 =
+ new DocCollection(
+ "c2",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+
+ // do not listen to c2
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
+
+ writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
+ writer.writePendingUpdates();
+
+ reader.forceUpdateCollection("c1");
+ reader.forceUpdateCollection("c2");
+
+ // should detect both collections (c1 watched, c2 lazy loaded)
+ currentCollections = reader.getCurrentCollections();
+ assertEquals(2, currentCollections.size());
+ }
- assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+ /**
+ * Simulates race condition that might arise when state updates triggered by watch notification
+ * contend with removal of collection watches.
+ *
+ * <p>Such race condition should no longer exist with the new code that uses a single map for both
+ * "collection watches" and "latest state of watched collection"
+ */
+ public void testWatchRaceCondition() throws Exception {
+ ExecutorService executorService =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(
+ new SolrNamedThreadFactory("zkStateReaderTest"));
+ CommonTestInjection.setDelay(1000);
+ final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
+ try {
+ ZkStateWriter writer = fixture.writer;
+ final ZkStateReader reader = fixture.reader;
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ // start another thread to constantly updating the state
+ final AtomicReference<Exception> updateException = new AtomicReference<>();
+ executorService.submit(
+ () -> {
+ try {
+ ClusterState clusterState = reader.getClusterState();
+ while (!stopMutatingThread.get()) {
+ DocCollection collection = clusterState.getCollectionOrNull("c1");
+ int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
+ // create new collection
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(
+ ZkStateReader.CONFIGNAME_PROP,
+ ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ currentVersion);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ } catch (Exception e) {
+ updateException.set(e);
+ }
+ return null;
+ });
+ executorService.shutdown();
- // reader.forceUpdateCollection("c1");
- reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
- ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
- assertNotNull(ref);
- assertFalse(ref.isLazilyLoaded());
+ reader.waitForState(
+ "c1",
+ 10,
+ TimeUnit.SECONDS,
+ slices -> slices != null); // wait for the state to become available
+
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ // remove itself on 2nd trigger
+ DocCollectionWatcher dummyWatcher =
+ collection -> {
+ latch.countDown();
+ return latch.getCount() == 0;
+ };
+ reader.registerDocCollectionWatcher("c1", dummyWatcher);
+ assertTrue(
+ "Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS));
+ reader.removeDocCollectionWatcher("c1", dummyWatcher);
+
+ // cluster state might not be updated right the way from the removeDocCollectionWatcher call
+ // above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher
+ // as well and might still be in the middle of updating the cluster state.
+ TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ timeOut.waitFor(
+ "The ref is not lazily loaded after waiting",
+ () -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+
+ if (updateException.get() != null) {
+ throw (updateException.get());
+ }
} finally {
- IOUtils.close(reader, zkClient);
- server.shutdown();
+ stopMutatingThread.set(true);
+ CommonTestInjection.reset();
+ ExecutorUtil.awaitTermination(executorService);
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index a2f9864525b..353500581c2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -23,7 +23,6 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -55,6 +55,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
@@ -171,10 +172,6 @@ public class ZkStateReader implements SolrCloseable {
public static final String SHARD_LEADERS_ZKNODE = "leaders";
public static final String ELECTION_NODE = "election";
- /** "Interesting" and actively watched Collections. */
- private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates =
- new ConcurrentHashMap<>();
-
/** "Interesting" but not actively watched Collections. */
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates =
new ConcurrentHashMap<>();
@@ -195,8 +192,11 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener;
- private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches =
- new ConcurrentHashMap<>();
+ /**
+ * Collections with active watches. The {@link StatefulCollectionWatch} inside for each collection
+ * might also contain the latest DocCollection (state) observed
+ */
+ private DocCollectionWatches collectionWatches = new DocCollectionWatches();
// named this observers so there's less confusion between CollectionPropsWatcher map and the
// PropsWatcher map.
@@ -246,6 +246,140 @@ public class ZkStateReader implements SolrCloseable {
}
}
+ /**
+ * A ConcurrentHashMap of active watcher by collection name
+ *
+ * <p>Each watcher DocCollectionWatch also contains the latest DocCollection (state) observed
+ */
+ private static class DocCollectionWatches {
+ private final ConcurrentHashMap<String, StatefulCollectionWatch>
+ statefulWatchesByCollectionName = new ConcurrentHashMap<>();
+
+ /**
+ * Gets the DocCollection (state) of the collection which the corresponding watch last observed
+ *
+ * @param collection the collection name to get DocCollection on
+ * @return The last observed DocCollection(state). if null, that means there's no such
+ * collection.
+ */
+ private DocCollection getDocCollection(String collection) {
+ StatefulCollectionWatch watch = statefulWatchesByCollectionName.get(collection);
+ return watch != null ? watch.currentState : null;
+ }
+
+ /**
+ * Gets the active collections (collections that exist) being watched
+ *
+ * @return an immutable set of active collection names
+ */
+ private Set<String> activeCollections() {
+ return statefulWatchesByCollectionName.entrySet().stream()
+ .filter(
+ (Entry<String, StatefulCollectionWatch> entry) ->
+ entry.getValue().currentState != null)
+ .map(Entry::getKey)
+ .collect(Collectors.toUnmodifiableSet());
+ }
+
+ /**
+ * Gets the count of active collections (collections that exist) being watched
+ *
+ * @return the count of active collections
+ */
+ private long activeCollectionCount() {
+ return statefulWatchesByCollectionName.entrySet().stream()
+ .filter(
+ (Entry<String, StatefulCollectionWatch> entry) ->
+ entry.getValue().currentState != null)
+ .count();
+ }
+
+ /**
+ * Gets a Set of watched collection names. The returned value is thread-safe and unmodifiable.
+ *
+ * @return Set of watched collection names
+ */
+ private Set<String> watchedCollections() {
+ return Collections.unmodifiableSet(statefulWatchesByCollectionName.keySet());
+ }
+
+ private Set<Entry<String, StatefulCollectionWatch>> watchedCollectionEntries() {
+ return Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet());
+ }
+
+ /**
+ * Updates the latest observed DocCollection (state) of the {@link StatefulCollectionWatch} if
+ * the collection is being watched
+ *
+ * @param collection the collection name
+ * @param newState the new DocCollection (state) observed
+ * @return whether the state has changed for the watched collection
+ */
+ private boolean updateDocCollection(String collection, DocCollection newState) {
+ AtomicBoolean stateHasChanged = new AtomicBoolean(false);
+ statefulWatchesByCollectionName.computeIfPresent(
+ collection,
+ (col, watch) -> {
+ DocCollection oldState = watch.currentState;
+ if (oldState == null && newState == null) {
+ // OK, the collection not yet exist in ZK or already deleted
+ } else if (oldState == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Add data for [{}] ver [{}]", collection, newState.getZNodeVersion());
+ }
+ watch.currentState = newState;
+ } else if (newState == null) {
+ log.debug("Removing cached collection state for [{}]", collection);
+ watch.currentState = null;
+ } else { // both new and old states are non-null
+ int oldCVersion =
+ oldState.getPerReplicaStates() == null
+ ? -1
+ : oldState.getPerReplicaStates().cversion;
+ int newCVersion =
+ newState.getPerReplicaStates() == null
+ ? -1
+ : newState.getPerReplicaStates().cversion;
+ if (oldState.getZNodeVersion() < newState.getZNodeVersion()
+ || oldCVersion < newCVersion) {
+ watch.currentState = newState;
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Updating data for [{}] from [{}] to [{}]",
+ collection,
+ oldState.getZNodeVersion(),
+ newState.getZNodeVersion());
+ }
+ }
+ }
+ stateHasChanged.set(oldState != watch.currentState);
+ return watch;
+ });
+
+ return stateHasChanged.get();
+ }
+
+ /**
+ * Computes the new StatefulCollectionWatch by the supplied remappingFunction.
+ *
+ * @param collectionName collection name
+ * @param remappingFunction remaps the StatefulCollectionWatch. If this returns null, the
+ * associated StatefulCollectionWatch will be removed; otherwise, the returned value will be
+ * assigned to such collection
+ * @return the new StatefulCollectionWatch associated with the collection
+ * @see ConcurrentHashMap#compute(Object, BiFunction)
+ */
+ private StatefulCollectionWatch compute(
+ String collectionName,
+ BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
+ return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
+ }
+ }
+
+ private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+ private DocCollection currentState;
+ }
+
public static final Set<String> KNOWN_CLUSTER_PROPS =
Set.of(
URL_SCHEME,
@@ -326,12 +460,17 @@ public class ZkStateReader implements SolrCloseable {
// No need to set watchers because we should already have watchers registered for everything.
refreshCollectionList(null);
refreshLiveNodes(null);
- // Need a copy so we don't delete from what we're iterating over.
- Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
+
Set<String> updatedCollections = new HashSet<>();
- for (String coll : safeCopy) {
+
+ // Iterate through the actively watched collections. Take note that the returned watched
+ // collections might change during the iteration, but it should not throw exception as
+ // it's thread-safe.
+ // If such set is modified elsewhere during the iteration, the code logic should still
+ // handle such missing/extra collection w/o issues.
+ for (String coll : collectionWatches.watchedCollections()) {
DocCollection newState = fetchCollectionState(coll, null);
- if (updateWatchedCollection(coll, newState)) {
+ if (collectionWatches.updateDocCollection(coll, newState)) {
updatedCollections.add(coll);
}
}
@@ -370,11 +509,11 @@ public class ZkStateReader implements SolrCloseable {
if (ref.get() != null) {
return;
}
- } else if (watchedCollectionStates.containsKey(collection)) {
+ } else if (collectionWatches.watchedCollections().contains(collection)) {
// Exists as a watched collection, force a refresh.
log.debug("Forcing refresh of watched collection state for {}", collection);
DocCollection newState = fetchCollectionState(collection, null);
- if (updateWatchedCollection(collection, newState)) {
+ if (collectionWatches.updateDocCollection(collection, newState)) {
constructState(Collections.singleton(collection));
}
} else {
@@ -398,7 +537,7 @@ public class ZkStateReader implements SolrCloseable {
DocCollection nu = getCollectionLive(coll);
if (nu == null) return -1;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
- if (updateWatchedCollection(coll, nu)) {
+ if (collectionWatches.updateDocCollection(coll, nu)) {
synchronized (getUpdateLock()) {
constructState(Collections.singleton(coll));
}
@@ -525,8 +664,13 @@ public class ZkStateReader implements SolrCloseable {
Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
// Add collections
- for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
- result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+ for (Entry<String, StatefulCollectionWatch> entry :
+ collectionWatches.watchedCollectionEntries()) {
+ if (entry.getValue().currentState != null) {
+ // if the doc is null for the collection watch, then it should not be inserted into the
+ // state
+ result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue().currentState));
+ }
}
// Finally, add any lazy collections that aren't already accounted for.
@@ -539,8 +683,8 @@ public class ZkStateReader implements SolrCloseable {
if (log.isDebugEnabled()) {
log.debug(
"clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
- collectionWatches.keySet().size(),
- watchedCollectionStates.keySet().size(),
+ collectionWatches.watchedCollections().size(),
+ collectionWatches.activeCollectionCount(),
lazyCollectionStates.keySet().size(),
clusterState.getCollectionStates().size());
}
@@ -548,8 +692,8 @@ public class ZkStateReader implements SolrCloseable {
if (log.isTraceEnabled()) {
log.trace(
"clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
- collectionWatches.keySet(),
- watchedCollectionStates.keySet(),
+ collectionWatches.watchedCollections(),
+ collectionWatches.activeCollections(),
lazyCollectionStates.keySet(),
clusterState.getCollectionStates());
}
@@ -563,7 +707,7 @@ public class ZkStateReader implements SolrCloseable {
/** Refresh collections. */
private void refreshCollections() {
- for (String coll : collectionWatches.keySet()) {
+ for (String coll : collectionWatches.watchedCollections()) {
new StateWatcher(coll).refreshAndWatch();
}
}
@@ -593,7 +737,7 @@ public class ZkStateReader implements SolrCloseable {
this.lazyCollectionStates.keySet().retainAll(children);
for (String coll : children) {
// We will create an eager collection for any interesting collections, so don't add to lazy.
- if (!collectionWatches.containsKey(coll)) {
+ if (!collectionWatches.watchedCollections().contains(coll)) {
// Double check contains just to avoid allocating an object.
LazyCollectionRef existing = lazyCollectionStates.get(coll);
if (existing == null) {
@@ -642,9 +786,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
- private Set<String> getCurrentCollections() {
+ public Set<String> getCurrentCollections() {
Set<String> collections = new HashSet<>();
- collections.addAll(watchedCollectionStates.keySet());
+ collections.addAll(collectionWatches.activeCollections());
collections.addAll(lazyCollectionStates.keySet());
return collections;
}
@@ -1270,7 +1414,7 @@ public class ZkStateReader implements SolrCloseable {
return;
}
- if (!collectionWatches.containsKey(coll)) {
+ if (!collectionWatches.watchedCollections().contains(coll)) {
// This collection is no longer interesting, stop watching.
log.debug("Uninteresting collection {}", coll);
return;
@@ -1294,8 +1438,8 @@ public class ZkStateReader implements SolrCloseable {
/**
* Refresh collection state from ZK and leave a watch for future changes. As a side effect,
- * updates {@link #clusterState} and {@link #watchedCollectionStates} with the results of the
- * refresh.
+ * updates {@link #clusterState} and collection ref within {@link #collectionWatches} with the
+ * results of the refresh.
*/
public void refreshAndWatch(EventType eventType) {
try {
@@ -1308,7 +1452,7 @@ public class ZkStateReader implements SolrCloseable {
}
DocCollection newState = fetchCollectionState(coll, this);
- updateWatchedCollection(coll, newState);
+ collectionWatches.updateDocCollection(coll, newState);
synchronized (getUpdateLock()) {
constructState(Collections.singleton(coll));
}
@@ -1332,10 +1476,10 @@ public class ZkStateReader implements SolrCloseable {
replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
PerReplicaStates newStates =
new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
- DocCollection oldState = watchedCollectionStates.get(coll);
+ DocCollection oldState = collectionWatches.getDocCollection(coll);
final DocCollection newState =
oldState != null ? oldState.copyWith(newStates) : fetchCollectionState(coll, null);
- updateWatchedCollection(coll, newState);
+ collectionWatches.updateDocCollection(coll, newState);
synchronized (getUpdateLock()) {
constructState(Collections.singleton(coll));
}
@@ -1612,7 +1756,7 @@ public class ZkStateReader implements SolrCloseable {
(k, v) -> {
if (v == null) {
reconstructState.set(true);
- v = new CollectionWatch<>();
+ v = new StatefulCollectionWatch();
}
v.coreRefCount++;
return v;
@@ -1640,7 +1784,6 @@ public class ZkStateReader implements SolrCloseable {
if (v == null) return null;
if (v.coreRefCount > 0) v.coreRefCount--;
if (v.canBeRemoved()) {
- watchedCollectionStates.remove(collection);
lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
reconstructState.set(true);
return null;
@@ -1697,7 +1840,7 @@ public class ZkStateReader implements SolrCloseable {
collection,
(k, v) -> {
if (v == null) {
- v = new CollectionWatch<>();
+ v = new StatefulCollectionWatch();
watchSet.set(true);
}
v.stateWatchers.add(stateWatcher);
@@ -1724,7 +1867,7 @@ public class ZkStateReader implements SolrCloseable {
log.debug("update for a fresh per-replica-state {}", c.getName());
}
DocCollection modifiedColl = c.copyWith(newPrs);
- updateWatchedCollection(c.getName(), modifiedColl);
+ collectionWatches.updateDocCollection(c.getName(), modifiedColl);
return modifiedColl;
} else {
return c;
@@ -1919,9 +2062,9 @@ public class ZkStateReader implements SolrCloseable {
if (v == null) return null;
v.stateWatchers.remove(watcher);
if (v.canBeRemoved()) {
- watchedCollectionStates.remove(collection);
lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
reconstructState.set(true);
+ assert CommonTestInjection.injectDelay(); // To unit test race condition
return null;
}
return v;
@@ -1947,65 +2090,6 @@ public class ZkStateReader implements SolrCloseable {
return watchers;
}
- // returns true if the state has changed
- private boolean updateWatchedCollection(String coll, DocCollection newState) {
-
- if (newState == null) {
- log.debug("Removing cached collection state for [{}]", coll);
- watchedCollectionStates.remove(coll);
- return true;
- }
-
- boolean updated = false;
- // CAS update loop
- while (true) {
- if (!collectionWatches.containsKey(coll)) {
- break;
- }
- DocCollection oldState = watchedCollectionStates.get(coll);
- if (oldState == null) {
- if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
- if (log.isDebugEnabled()) {
- log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
- }
- updated = true;
- break;
- }
- } else {
- int oldCVersion =
- oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
- int newCVersion =
- newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
- if (oldState.getZNodeVersion() >= newState.getZNodeVersion()
- && oldCVersion >= newCVersion) {
- // no change to state, but we might have been triggered by the addition of a
- // state watcher, so run notifications
- updated = true;
- break;
- }
- if (watchedCollectionStates.replace(coll, oldState, newState)) {
- if (log.isDebugEnabled()) {
- log.debug(
- "Updating data for [{}] from [{}] to [{}]",
- coll,
- oldState.getZNodeVersion(),
- newState.getZNodeVersion());
- }
- updated = true;
- break;
- }
- }
- }
-
- // Resolve race with unregisterCore.
- if (!collectionWatches.containsKey(coll)) {
- watchedCollectionStates.remove(coll);
- log.debug("Removing uninteresting collection [{}]", coll);
- }
-
- return updated;
- }
-
public void registerCollectionPropsWatcher(
final String collection, CollectionPropsWatcher propsWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
index c837b895049..e1d7caa8dd9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
@@ -17,7 +17,10 @@
package org.apache.solr.common.util;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Allows random faults to be injected in running code during test runs across all solr packages.
@@ -25,11 +28,14 @@ import java.util.Map;
* @lucene.internal
*/
public class CommonTestInjection {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static volatile Map<String, String> additionalSystemProps = null;
+ private static volatile Integer delay = null;
public static void reset() {
additionalSystemProps = null;
+ delay = null;
}
public static void setAdditionalProps(Map<String, String> additionalSystemProps) {
@@ -39,4 +45,32 @@ public class CommonTestInjection {
public static Map<String, String> injectAdditionalProps() {
return additionalSystemProps;
}
+
+ /**
+ * Set test delay (sleep) in unit of millisec
+ *
+ * @param delay delay in millisec, null to remove such delay
+ */
+ public static void setDelay(Integer delay) {
+ CommonTestInjection.delay = delay;
+ }
+
+ /**
+ * Inject an artificial delay(sleep) into the code
+ *
+ * @return true
+ */
+ public static boolean injectDelay() {
+ if (delay != null) {
+ try {
+ log.info("Start: artificial delay for {}ms", delay);
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ log.info("Finish: artificial delay for {}ms", delay);
+ }
+ }
+ return true;
+ }
}