You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/06/17 16:59:53 UTC
[lucene-solr] branch master updated: SOLR-13490: Fix
CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader
and CloudSolrClient to be triggered on liveNode changes.
This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 5a97486 SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.
5a97486 is described below
commit 5a974860fa83408a86ca64b417f3111b037da7eb
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Jun 17 09:59:43 2019 -0700
SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.
Also add Predicate<DocCollection> equivilents for callers that don't care about liveNodes.
---
solr/CHANGES.txt | 4 +
.../java/org/apache/solr/cloud/ZkController.java | 14 +-
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 2 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 2 +-
.../OverseerCollectionMessageHandler.java | 2 +-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 6 +-
.../cloud/TestWaitForStateWithJettyShutdowns.java | 155 +++++++++++
.../solr/common/cloud/ZkStateReaderAccessor.java | 2 +-
.../client/solrj/impl/BaseCloudSolrClient.java | 68 ++++-
.../common/cloud/CollectionStatePredicate.java | 10 +-
.../solr/common/cloud/CollectionStateWatcher.java | 9 +-
...StateWatcher.java => DocCollectionWatcher.java} | 17 +-
.../apache/solr/common/cloud/ZkStateReader.java | 202 ++++++++++++--
.../common/cloud/TestCollectionStateWatchers.java | 180 ++++++++-----
.../common/cloud/TestDocCollectionWatcher.java | 291 +++++++++++++++++++++
.../solr/cloud/AbstractDistribZkTestBase.java | 6 +-
.../solr/cloud/AbstractFullDistribZkTestBase.java | 4 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 2 +-
19 files changed, 856 insertions(+), 122 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2f7ab1e..061d2b3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -159,6 +159,10 @@ Bug Fixes
* SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev)
+* SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and
+ CloudSolrClient to be triggered on liveNode changes. Also add Predicate<DocCollection> equivilents
+ for callers that don't care about liveNodes. (hossman)
+
Other Changes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8e90dd8..af298c5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -68,12 +68,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
@@ -1052,7 +1052,7 @@ public class ZkController implements Closeable {
CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
- zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> {
+ zkStateReader.registerDocCollectionWatcher(collectionWithLocalReplica, (collectionState) -> {
if (collectionState == null) return false;
boolean foundStates = true;
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
@@ -1194,7 +1194,7 @@ public class ZkController implements Closeable {
// check replica's existence in clusterstate first
try {
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
- TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+ TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
}
@@ -1301,7 +1301,7 @@ public class ZkController implements Closeable {
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
// the watcher is added to a set so multiple calls of this method will left only one watcher
- zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
+ zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
return shardId;
} finally {
@@ -1845,7 +1845,7 @@ public class ZkController implements Closeable {
AtomicReference<String> errorMessage = new AtomicReference<>();
AtomicReference<DocCollection> collectionState = new AtomicReference<>();
try {
- zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (n, c) -> {
+ zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
collectionState.set(c);
if (c == null)
return false;
@@ -2545,7 +2545,7 @@ public class ZkController implements Closeable {
};
}
- private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
+ private class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher {
String coreNodeName;
String shard;
String coreName;
@@ -2558,7 +2558,7 @@ public class ZkController implements Closeable {
@Override
// synchronized due to SOLR-11535
- public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ public synchronized boolean onStateChanged(DocCollection collectionState) {
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index dfd5a21..372ae53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
CollectionAdminParams.COLOCATED_WITH, collectionName);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
try {
- zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
+ zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
} catch (TimeoutException e) {
log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
// maybe the overseer queue is backed up, we don't want to fail the create request
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 2054258..6c0b147 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -134,7 +134,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection
- zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null);
+ zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
// we can delete any remaining unique aliases
if (!aliasReferences.isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index e38aa4a..c5c8e99 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -134,7 +134,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
- zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null);
+ zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
} catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 81407c6..d8be1f2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -416,7 +416,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
try {
- zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> {
+ zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
if (c == null)
return true;
Slice slice = c.getSlice(shard);
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 62a891a..1050b4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -35,8 +35,8 @@ import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -115,7 +115,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
- Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+ Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
.process(cluster.getSolrClient());
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
@@ -221,7 +221,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
Replica replica = getRandomReplica(shard);
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
- Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+ Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
new file mode 100644
index 0000000..4e21fb3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import java.util.Set;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+import static org.apache.solr.cloud.SolrCloudTestCase.clusterShape;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public void testWaitForStateAfterShutDown() throws Exception {
+ final String col_name = "test_col";
+ final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
+ (1, createTempDir(), JettyConfig.builder().build());
+ try {
+ log.info("Create our collection");
+ CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+
+ log.info("Sanity check that our collection has come online");
+ cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, clusterShape(1, 1));
+
+ log.info("Shutdown 1 node");
+ final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
+ nodeToStop.stop();
+ log.info("Wait to confirm our node is fully shutdown");
+ cluster.waitForJettyToStop(nodeToStop);
+
+ // now that we're confident that node has stoped, check if a waitForState
+ // call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
+ log.info("Now check if waitForState will recognize we already have the exepcted state");
+ cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
+
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ public void testWaitForStateBeforeShutDown() throws Exception {
+ final String col_name = "test_col";
+ final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool
+ (1, new DefaultSolrThreadFactory("background_executor"));
+ final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
+ (1, createTempDir(), JettyConfig.builder().build());
+ try {
+ log.info("Create our collection");
+ CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+
+ log.info("Sanity check that our collection has come online");
+ cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS,
+ SolrCloudTestCase.clusterShape(1, 1));
+
+
+ // HACK implementation detail...
+ //
+ // we know that in the current implementation, waitForState invokes the predicate twice
+ // independently of the current state of the collection and/or wether the predicate succeeds.
+ // If this implementation detail changes, (ie: so that it's only invoked once)
+ // then this number needs to change -- but the test fundementally depends on the implementation
+ // calling the predicate at least once, which should also be neccessary for any future impl
+ // (to verify that it didn't "miss" the state change when creating the watcher)
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ final Future<?> backgroundWaitForState = executor.submit
+ (() -> {
+ try {
+ cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
+ new LatchCountingPredicateWrapper(latch,
+ clusterShape(1, 0)));
+ } catch (Exception e) {
+ log.error("background thread got exception", e);
+ throw new RuntimeException(e);
+ }
+ return;
+ }, null);
+
+ log.info("Awaiting latch...");
+ if (! latch.await(120, TimeUnit.SECONDS)) {
+ fail("timed out Waiting a ridiculous amount of time for the waitForState latch -- did impl change?");
+ }
+
+ log.info("Shutdown 1 node");
+ final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
+ nodeToStop.stop();
+ log.info("Wait to confirm our node is fully shutdown");
+ cluster.waitForJettyToStop(nodeToStop);
+
+ // now that we're confident that node has stoped, check if a waitForState
+ // call will detect the missing replica -- shouldn't need long wait times...
+ log.info("Checking Future result to see if waitForState finished successfully");
+ try {
+ backgroundWaitForState.get();
+ } catch (ExecutionException e) {
+ log.error("background waitForState exception", e);
+ throw e;
+ }
+
+ } finally {
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
+ cluster.shutdown();
+ }
+ }
+
+ public final class LatchCountingPredicateWrapper implements CollectionStatePredicate {
+ private final CountDownLatch latch;
+ private final CollectionStatePredicate inner;
+ public LatchCountingPredicateWrapper(final CountDownLatch latch, final CollectionStatePredicate inner) {
+ this.latch = latch;
+ this.inner = inner;
+ }
+ public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
+ final boolean result = inner.matches(liveNodes, collectionState);
+ log.info("Predicate called: result={}, (pre)latch={}, liveNodes={}, state={}",
+ result, latch.getCount(), liveNodes, collectionState);
+ latch.countDown();
+ return result;
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
index 0853ee0..b40a7a2 100644
--- a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
+++ b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java
@@ -28,7 +28,7 @@ public class ZkStateReaderAccessor {
this.zkStateReader = zkStateReader;
}
- public Set<CollectionStateWatcher> getStateWatchers(String collection) {
+ public Set<DocCollectionWatcher> getStateWatchers(String collection) {
return zkStateReader.getStateWatchers(collection);
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 02fbb70..82645fd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.ResponseParser;
@@ -62,6 +63,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
@@ -362,11 +364,21 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
/**
- * Block until a collection state matches a predicate, or a timeout
+ * Block until a CollectionStatePredicate returns true, or the wait times out
*
+ * <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
+ * </p>
+ *
+ * <p>
+ * This implementation utilizes {@link CollectionStateWatcher} internally.
+ * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
+ * instead
+ * </p>
*
+ * @see #waitForState(String, long, TimeUnit, Predicate)
+ * @see #registerCollectionStateWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
@@ -379,14 +391,45 @@ public abstract class BaseCloudSolrClient extends SolrClient {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
}
+ /**
+ * Block until a Predicate returns true, or the wait times out
+ *
+ * <p>
+ * Note that the predicate may be called again even after it has returned true, so
+ * implementors should avoid changing state within the predicate call itself.
+ * </p>
+ *
+ * @see #registerDocCollectionWatcher
+ * @param collection the collection to watch
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate a {@link Predicate} to test against the {@link DocCollection}
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
+ */
+ public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+ throws InterruptedException, TimeoutException {
+ getClusterStateProvider().connect();
+ assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+ }
/**
* Register a CollectionStateWatcher to be called when the cluster state for a collection changes
+ * <em>or</em> the set of live nodes changes.
+ *
+ * <p>
+ * The Watcher will automatically be removed when it's
+ * <code>onStateChanged</code> returns <code>true</code>
+ * </p>
*
- * Note that the watcher is unregistered after it has been called once. To make a watcher persistent,
- * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
- * call
+ * <p>
+ * This implementation utilizes {@link ZkStateReader#registerCollectionStateWatcher} internally.
+ * Callers that don't care about liveNodes are encouraged to use a {@link DocCollectionWatcher}
+ * instead
+ * </p>
*
+ * @see #registerDocCollectionWatcher(String, DocCollectionWatcher)
+ * @see ZkStateReader#registerCollectionStateWatcher
* @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes
*/
@@ -394,6 +437,23 @@ public abstract class BaseCloudSolrClient extends SolrClient {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
}
+
+ /**
+ * Register a DocCollectionWatcher to be called when the cluster state for a collection changes.
+ *
+ * <p>
+ * The Watcher will automatically be removed when it's
+ * <code>onStateChanged</code> returns <code>true</code>
+ * </p>
+ *
+ * @see ZkStateReader#registerDocCollectionWatcher
+ * @param collection the collection to watch
+ * @param watcher a watcher that will be called when the state changes
+ */
+ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
+ getClusterStateProvider().connect();
+ assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher);
+ }
private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
index 37b00d7..a91a499 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
@@ -19,23 +19,27 @@ package org.apache.solr.common.cloud;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
/**
- * Interface to determine if a collection state matches a required state
+ * Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
*
* @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
+ * @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
*/
public interface CollectionStatePredicate {
/**
- * Check the collection state matches a required state
- *
+ * Check if the set of liveNodes <em>and</em> the collection state matches a required state
+ * <p>
* Note that both liveNodes and collectionState should be consulted to determine
* the overall state.
+ * </p>
*
* @param liveNodes the current set of live nodes
* @param collectionState the latest collection state, or null if the collection
* does not exist
+ * @return true if the input matches the requirements of this predicate
*/
boolean matches(Set<String> liveNodes, DocCollection collectionState);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
index d75823c..63bfaf9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
@@ -21,21 +21,24 @@ import java.util.Set;
/**
* Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
- * and called whenever the collection state changes.
+ * and called whenever there is a change in the collection state <em>or</em> in the list of liveNodes.
+ *
+ * @see DocCollectionWatcher
*/
public interface CollectionStateWatcher {
/**
- * Called when the collection we are registered against has a change of state.
+ * Called when either the collection we are registered against has a change of state <em>or</em> there is a change to the live nodes of our collection.
*
+ * <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates.
+ * </p>
*
* @param liveNodes the set of live nodes
* @param collectionState the new collection state (may be null if the collection has been
* deleted)
- *
* @return true if the watcher should be removed
*/
boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
similarity index 71%
copy from solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
copy to solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
index d75823c..0d65cfe 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java
@@ -17,27 +17,24 @@
package org.apache.solr.common.cloud;
-import java.util.Set;
-
/**
- * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
- * and called whenever the collection state changes.
+ * Callback registered with {@link ZkStateReader#registerDocCollectionWatcher(String, DocCollectionWatcher)}
+ * and called whenever the DocCollection changes.
*/
-public interface CollectionStateWatcher {
+public interface DocCollectionWatcher {
/**
* Called when the collection we are registered against has a change of state.
*
+ * <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates.
+ * </p>
*
- * @param liveNodes the set of live nodes
- * @param collectionState the new collection state (may be null if the collection has been
- * deleted)
- *
+ * @param collection the new collection state (may be null if the collection has been deleted)
* @return true if the watcher should be removed
*/
- boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);
+ boolean onStateChanged(DocCollection collection);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3af08d8..cd72203 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -185,7 +186,7 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener;
- private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
@@ -590,7 +591,7 @@ public class ZkStateReader implements SolrCloseable {
notifyCloudCollectionsListeners();
for (String collection : changedCollections) {
- notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
+ notifyStateWatchers(collection, clusterState.getCollectionOrNull(collection));
}
}
@@ -1598,9 +1599,46 @@ public class ZkStateReader implements SolrCloseable {
}
/**
- * Register a CollectionStateWatcher to be called when the state of a collection changes
+ * Register a CollectionStateWatcher to be called when the state of a collection changes
+ * <em>or</em> the set of live nodes changes.
+ *
+ * <p>
+ * The Watcher will automatically be removed when it's
+ * <code>onStateChanged</code> returns <code>true</code>
+ * </p>
+ *
+ * <p>
+ * This is method is just syntactic sugar for registering both a {@link DocCollectionWatcher} and
+ * a {@link LiveNodesListener}. Callers that only care about one or the other (but not both) are
+ * encouraged to use the more specific methods register methods as it may reduce the number of
+ * ZooKeeper watchers needed, and reduce the amount of network/cpu used.
+ * </p>
+ *
+ * @see #registerDocCollectionWatcher
+ * @see #registerLiveNodesListener
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
+ final DocCollectionAndLiveNodesWatcherWrapper wrapper
+ = new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
+
+ registerDocCollectionWatcher(collection, wrapper);
+ registerLiveNodesListener(wrapper);
+
+ DocCollection state = clusterState.getCollectionOrNull(collection);
+ if (stateWatcher.onStateChanged(liveNodes, state) == true) {
+ removeCollectionStateWatcher(collection, stateWatcher);
+ }
+ }
+
+ /**
+ * Register a DocCollectionWatcher to be called when the state of a collection changes
+ *
+ * <p>
+ * The Watcher will automatically be removed when it's
+ * <code>onStateChanged</code> returns <code>true</code>
+ * </p>
+ */
+ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
@@ -1616,17 +1654,27 @@ public class ZkStateReader implements SolrCloseable {
}
DocCollection state = clusterState.getCollectionOrNull(collection);
- if (stateWatcher.onStateChanged(liveNodes, state) == true) {
- removeCollectionStateWatcher(collection, stateWatcher);
+ if (stateWatcher.onStateChanged(state) == true) {
+ removeDocCollectionWatcher(collection, stateWatcher);
}
}
/**
* Block until a CollectionStatePredicate returns true, or the wait times out
*
+ * <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
+ * </p>
*
+ * <p>
+ * This implementation utilizes {@link CollectionStateWatcher} internally.
+ * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
+ * instead
+ * </p>
+ *
+ * @see #waitForState(String, long, TimeUnit, Predicate)
+ * @see #registerCollectionStateWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
@@ -1665,13 +1713,60 @@ public class ZkStateReader implements SolrCloseable {
waitLatches.remove(latch);
}
}
-
+
/**
- * Block until a LiveNodesStatePredicate returns true, or the wait times out
+ * Block until a Predicate returns true, or the wait times out
*
+ * <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
+ * </p>
*
+ * @param collection the collection to watch
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate the predicate to call on state changes
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
+ */
+ public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+ throws InterruptedException, TimeoutException {
+
+ if (closed) {
+ throw new AlreadyClosedException();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ waitLatches.add(latch);
+ AtomicReference<DocCollection> docCollection = new AtomicReference<>();
+ DocCollectionWatcher watcher = (c) -> {
+ docCollection.set(c);
+ boolean matches = predicate.test(c);
+ if (matches)
+ latch.countDown();
+
+ return matches;
+ };
+ registerDocCollectionWatcher(collection, watcher);
+
+ try {
+ // wait for the watcher predicate to return true, or time out
+ if (!latch.await(wait, unit))
+ throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
+
+ }
+ finally {
+ removeDocCollectionWatcher(collection, watcher);
+ waitLatches.remove(latch);
+ }
+ }
+
+ /**
+ * Block until a LiveNodesStatePredicate returns true, or the wait times out
+ * <p>
+ * Note that the predicate may be called again even after it has returned true, so
+ * implementors should avoid changing state within the predicate call itself.
+ * </p>
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
@@ -1713,14 +1808,35 @@ public class ZkStateReader implements SolrCloseable {
/**
* Remove a watcher from a collection's watch list.
- *
+ * <p>
* This allows Zookeeper watches to be removed if there is no interest in the
* collection.
+ * </p>
*
+ * @see #registerCollectionStateWatcher
* @param collection the collection
* @param watcher the watcher
*/
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
+ final DocCollectionAndLiveNodesWatcherWrapper wrapper
+ = new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
+
+ removeDocCollectionWatcher(collection, wrapper);
+ removeLiveNodesListener(wrapper);
+ }
+
+ /**
+ * Remove a watcher from a collection's watch list.
+ * <p>
+ * This allows Zookeeper watches to be removed if there is no interest in the
+ * collection.
+ * </p>
+ *
+ * @see #registerDocCollectionWatcher
+ * @param collection the collection
+ * @param watcher the watcher
+ */
+ public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
@@ -1742,8 +1858,8 @@ public class ZkStateReader implements SolrCloseable {
}
/* package-private for testing */
- Set<CollectionStateWatcher> getStateWatchers(String collection) {
- final Set<CollectionStateWatcher> watchers = new HashSet<>();
+ Set<DocCollectionWatcher> getStateWatchers(String collection) {
+ final Set<DocCollectionWatcher> watchers = new HashSet<>();
collectionWatches.compute(collection, (k, v) -> {
if (v != null) {
watchers.addAll(v.stateWatchers);
@@ -1845,12 +1961,12 @@ public class ZkStateReader implements SolrCloseable {
}
}
- private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
+ private void notifyStateWatchers(String collection, DocCollection collectionState) {
if (this.closed) {
return;
}
try {
- notifications.submit(new Notification(liveNodes, collection, collectionState));
+ notifications.submit(new Notification(collection, collectionState));
}
catch (RejectedExecutionException e) {
if (closed == false) {
@@ -1861,29 +1977,27 @@ public class ZkStateReader implements SolrCloseable {
private class Notification implements Runnable {
- final Set<String> liveNodes;
final String collection;
final DocCollection collectionState;
- private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) {
- this.liveNodes = liveNodes;
+ private Notification(String collection, DocCollection collectionState) {
this.collection = collection;
this.collectionState = collectionState;
}
@Override
public void run() {
- List<CollectionStateWatcher> watchers = new ArrayList<>();
+ List<DocCollectionWatcher> watchers = new ArrayList<>();
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
return null;
watchers.addAll(v.stateWatchers);
return v;
});
- for (CollectionStateWatcher watcher : watchers) {
+ for (DocCollectionWatcher watcher : watchers) {
try {
- if (watcher.onStateChanged(liveNodes, collectionState)) {
- removeCollectionStateWatcher(collection, watcher);
+ if (watcher.onStateChanged(collectionState)) {
+ removeDocCollectionWatcher(collection, watcher);
}
} catch (Exception exception) {
log.warn("Error on calling watcher", exception);
@@ -2118,4 +2232,54 @@ public class ZkStateReader implements SolrCloseable {
}
}
+ /**
+ * Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener}
+ * while wraping and delegating to a {@link CollectionStateWatcher}
+ */
+ private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener {
+ private final String collectionName;
+ private final CollectionStateWatcher delegate;
+
+ public int hashCode() {
+ return collectionName.hashCode() * delegate.hashCode();
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) {
+ DocCollectionAndLiveNodesWatcherWrapper that
+ = (DocCollectionAndLiveNodesWatcherWrapper) other;
+ return this.collectionName.equals(that.collectionName)
+ && this.delegate.equals(that.delegate);
+ }
+ return false;
+ }
+
+ public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName,
+ final CollectionStateWatcher delegate) {
+ this.collectionName = collectionName;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean onStateChanged(DocCollection collectionState) {
+ final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes,
+ collectionState);
+ if (result) {
+ // it might be a while before live nodes changes, so proactively remove ourselves
+ removeLiveNodesListener(this);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ final DocCollection collection = ZkStateReader.this.clusterState.getCollectionOrNull(collectionName);
+ final boolean result = delegate.onStateChanged(newLiveNodes, collection);
+ if (result) {
+ // it might be a while before collection changes, so proactively remove ourselves
+ removeDocCollectionWatcher(collectionName, this);
+ }
+ return result;
+ }
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 89cba06..f97b537 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -37,20 +37,22 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/** @see TestDocCollectionWatcher */
public class TestCollectionStateWatchers extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4;
- private static final int MAX_WAIT_TIMEOUT = 30;
+
+ private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
private ExecutorService executor = null;
@Before
public void prepareCluster() throws Exception {
configureCluster(CLUSTER_SIZE)
- .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
- .configure();
+ .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
+ .configure();
executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
}
@@ -75,6 +77,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException {
+
Future<Boolean> future = executor.submit(() -> {
try {
while (true) {
@@ -100,50 +103,71 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
- //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
- public void testSimpleCollectionWatch() throws Exception {
-
+ public void testCollectionWatchWithShutdownOfActiveNode() throws Exception {
+ doTestCollectionWatchWithNodeShutdown(false);
+ }
+
+ @Test
+ public void testCollectionWatchWithShutdownOfUnusedNode() throws Exception {
+ doTestCollectionWatchWithNodeShutdown(true);
+ }
+
+ private void doTestCollectionWatchWithNodeShutdown(final boolean shutdownUnusedNode)
+ throws Exception {
+
CloudSolrClient client = cluster.getSolrClient();
- CollectionAdminRequest.createCollection("testcollection", "config", 4, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+ // note: one node in our cluster is unsed by collection
+ CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
+ (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
+ final JettySolrRunner extraJetty = cluster.startJettySolrRunner();
+ final JettySolrRunner jettyToShutdown
+ = shutdownUnusedNode ? extraJetty : cluster.getJettySolrRunners().get(0);
+ final int expectedNodesWithActiveReplicas = CLUSTER_SIZE - (shutdownUnusedNode ? 0 : 1);
+
+ cluster.waitForAllNodes(MAX_WAIT_TIMEOUT);
+
// shutdown a node and check that we get notified about the change
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
- int nodeCount = 0;
+ int nodesWithActiveReplicas = 0;
log.info("State changed: {}", collectionState);
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes))
- nodeCount++;
+ nodesWithActiveReplicas++;
}
}
- if (nodeCount == 3) {
+ if (liveNodes.size() == CLUSTER_SIZE
+ && expectedNodesWithActiveReplicas == nodesWithActiveReplicas) {
latch.countDown();
return true;
}
return false;
});
- JettySolrRunner j = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
- cluster.waitForJettyToStop(j);
- assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ cluster.stopJettySolrRunner(jettyToShutdown);
+ cluster.waitForJettyToStop(jettyToShutdown);
+
+ assertTrue("CollectionStateWatcher was never notified of cluster change",
+ latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
- waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS,
- () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
+ waitFor("CollectionStateWatcher wasn't cleared after completion",
+ MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
}
@Test
- // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -151,9 +175,10 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
return false;
});
- assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ assertTrue("CollectionStateWatcher isn't called on new registration",
+ latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be retained",
- 1, client.getZkStateReader().getStateWatchers("currentstate").size());
+ 1, client.getZkStateReader().getStateWatchers("currentstate").size());
final CountDownLatch latch2 = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@@ -162,9 +187,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
});
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
- latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
- waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS,
- () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
+ latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
}
@Test
@@ -172,17 +197,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) {
try {
- client.waitForState("waitforstate", 1, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
- }
- catch (TimeoutException e) {
+ client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ } catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied");
}
}
@@ -190,114 +215,145 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
- // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
- .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
+ .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
assertTrue("waitForState was not triggered by collection creation", future.get());
}
@Test
- // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> {
- client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
+ client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
+ ((liveNodes, collectionState) -> false));
});
- waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
- () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
+ waitFor("Watchers for collection should be removed after timeout",
+ MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
}
@Test
- //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
+ (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
final CountDownLatch firstCall = new CountDownLatch(1);
// stop a node, then add a watch waiting for all nodes to be back up
- JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
+ JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt
+ (cluster.getJettySolrRunners().size()));
cluster.waitForJettyToStop(node1);
- Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (liveNodes, collectionState) -> {
firstCall.countDown();
return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1);
});
// first, stop another node; the watch should not be fired after this!
- JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
+ JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt
+ (cluster.getJettySolrRunners().size()));
// now start them both back up
cluster.startJettySolrRunner(node1);
- assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ assertTrue("CollectionStateWatcher not called",
+ firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
cluster.startJettySolrRunner(node2);
Boolean result = future.get();
- assertTrue("Did not see a fully active cluster after 30 seconds", result);
+ assertTrue("Did not see a fully active cluster", result);
}
@Test
- // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!",
- client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+ client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> {
- client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
});
- waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
- () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+ waitFor("Watchers for collection should be removed after timeout",
+ MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
}
@Test
- // commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void testDeletionsTriggerWatches() throws Exception {
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
- .process(cluster.getSolrClient());
- Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null);
+ .process(cluster.getSolrClient());
+
+ Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (l, c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient());
- assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get());
+ assertTrue("CollectionStateWatcher not notified of delete call", future.get());
+ }
+
+ @Test
+ public void testLiveNodeChangesTriggerWatches() throws Exception {
+ final CloudSolrClient client = cluster.getSolrClient();
+
+ CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
+
+ Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (l, c) -> (l.size() == 1 + CLUSTER_SIZE));
+
+ JettySolrRunner unusedJetty = cluster.startJettySolrRunner();
+ assertTrue("CollectionStateWatcher not notified of new node", future.get());
+
+ waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
+
+ future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (l, c) -> (l.size() == CLUSTER_SIZE));
+
+ cluster.stopJettySolrRunner(unusedJetty);
+
+ assertTrue("CollectionStateWatcher not notified of node lost", future.get());
+
+ waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
+
}
@Test
- //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
- .processAndWait(client, MAX_WAIT_TIMEOUT);
- assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get());
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+ assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
+ future.get());
- Future<Boolean> migrated
- = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
- (n, c) -> c != null && c.getStateFormat() == 2);
+ Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> c != null && c.getStateFormat() == 2);
- CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
+ CollectionAdminRequest.migrateCollectionFormat("stateformat1")
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
new file mode 100644
index 0000000..f024a1c
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** @see TestCollectionStateWatchers */
+public class TestDocCollectionWatcher extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int CLUSTER_SIZE = 4;
+
+ private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
+
+ private ExecutorService executor = null;
+
+ @Before
+ public void prepareCluster() throws Exception {
+ configureCluster(CLUSTER_SIZE)
+ .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
+ .configure();
+ executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
+ }
+
+ @After
+ public void tearDownCluster() throws Exception {
+ executor.shutdown();
+ shutdownCluster();
+ executor = null;
+ }
+
+ private Future<Boolean> waitInBackground(String collection, long timeout, TimeUnit unit,
+ Predicate<DocCollection> predicate) {
+ return executor.submit(() -> {
+ try {
+ cluster.getSolrClient().waitForState(collection, timeout, unit, predicate);
+ } catch (InterruptedException | TimeoutException e) {
+ return Boolean.FALSE;
+ }
+ return Boolean.TRUE;
+ });
+ }
+
+ private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
+ throws InterruptedException, ExecutionException {
+
+ Future<Boolean> future = executor.submit(() -> {
+ try {
+ while (true) {
+ if (predicate.call())
+ return true;
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ });
+ try {
+ if (future.get(timeout, unit) == true) {
+ return;
+ }
+ }
+ catch (TimeoutException e) {
+ // pass failure message on
+ }
+ future.cancel(true);
+ fail(message);
+ }
+
+ @Test
+ public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
+
+ CloudSolrClient client = cluster.getSolrClient();
+ CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ client.registerDocCollectionWatcher("currentstate", (c) -> {
+ latch.countDown();
+ return false;
+ });
+
+ assertTrue("DocCollectionWatcher isn't called on new registration",
+ latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("DocCollectionWatcher should be retained",
+ 1, client.getZkStateReader().getStateWatchers("currentstate").size());
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ client.registerDocCollectionWatcher("currentstate", (c) -> {
+ latch2.countDown();
+ return true;
+ });
+
+ assertTrue("DocCollectionWatcher isn't called when registering for already-watched collection",
+ latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
+ waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
+ }
+
+ @Test
+ public void testWaitForStateChecksCurrentState() throws Exception {
+
+ CloudSolrClient client = cluster.getSolrClient();
+ CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+ client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+ // several goes, to check that we're not getting delayed state changes
+ for (int i = 0; i < 10; i++) {
+ try {
+ client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+ } catch (TimeoutException e) {
+ fail("waitForState should return immediately if the predicate is already satisfied");
+ }
+ }
+
+ }
+
+ @Test
+ public void testCanWaitForNonexistantCollection() throws Exception {
+
+ Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (c) -> (null != c));
+
+ CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
+ .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
+
+ assertTrue("waitForState was not triggered by collection creation", future.get());
+
+ }
+
+ @Test
+ public void testPredicateFailureTimesOut() throws Exception {
+ CloudSolrClient client = cluster.getSolrClient();
+ expectThrows(TimeoutException.class, () -> {
+ client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
+ ((liveNodes, collectionState) -> false));
+ });
+ waitFor("Watchers for collection should be removed after timeout",
+ MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
+
+ }
+
+ @Test
+ public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
+
+ CloudSolrClient client = cluster.getSolrClient();
+ CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+ // create collection with 1 shard 1 replica...
+ client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+ // set watcher waiting for at least 3 replicas (will fail initially)
+ final AtomicInteger runCount = new AtomicInteger(0);
+ final Future<Boolean> future = waitInBackground
+ ("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (collectionState) -> {
+ runCount.incrementAndGet();
+ int replicas = 0;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ replicas++;
+ }
+ }
+ return 3 <= replicas;
+ });
+
+ // add a 2nd replica...
+ CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+ client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 2));
+
+ // confirm watcher has run at least once and has been retained...
+ final int runCountSnapshot = runCount.get();
+ assertTrue(0 < runCountSnapshot);
+ assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size());
+
+ // now add a 3rd replica...
+ CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+
+ // now confirm watcher is invoked & removed
+ assertTrue("watcher never succeeded", future.get());
+ assertTrue(runCountSnapshot < runCount.get());
+ waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0);
+
+ }
+
+ @Test
+ public void testWatcherIsRemovedAfterTimeout() throws Exception {
+ CloudSolrClient client = cluster.getSolrClient();
+ assertTrue("There should be no watchers for a non-existent collection!",
+ client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+
+ expectThrows(TimeoutException.class, () -> {
+ client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
+ (c) -> (false));
+ });
+
+ waitFor("Watchers for collection should be removed after timeout",
+ MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
+
+ }
+
+ @Test
+ public void testDeletionsTriggerWatches() throws Exception {
+ final CloudSolrClient client = cluster.getSolrClient();
+ CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1).process(client);
+
+ client.waitForState("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+ Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (c) -> c == null);
+
+ CollectionAdminRequest.deleteCollection("tobedeleted").process(client);
+
+ assertTrue("DocCollectionWatcher not notified of delete call", future.get());
+ }
+
+ @Test
+ public void testWatchesWorkForStateFormat1() throws Exception {
+
+ final CloudSolrClient client = cluster.getSolrClient();
+
+ Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (c) -> (null != c) );
+
+ CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+ client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
+
+ assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
+ future.get());
+
+ Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
+ (c) -> c != null && c.getStateFormat() == 2);
+
+ CollectionAdminRequest.migrateCollectionFormat("stateformat1")
+ .processAndWait(client, MAX_WAIT_TIMEOUT);
+ assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
+
+ }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index eb2fd26..c9332bb 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -205,7 +205,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
- zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+ zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null)
return true;
return false;
@@ -237,7 +237,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Thread.sleep(100);
}
- zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+ zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null)
return false;
@@ -253,7 +253,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName,
Replica.State expectedState) throws InterruptedException, TimeoutException {
reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
- (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
+ (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index cdb6f68..ff87246 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -405,7 +405,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.setCreateNodeSet("")
.process(cloudClient).getStatus());
- cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount);
+ cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (c) -> c != null && c.getSlices().size() == sliceCount);
ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
@@ -585,7 +585,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
AtomicInteger nReplicas = new AtomicInteger();
try {
- client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> {
+ client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (c) -> {
if (c == null)
return false;
int numReplicas = getTotalReplicas(c, c.getName());
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 5faa307..a97832b 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
}
for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
- reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false);
+ reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
}
}