You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by md...@apache.org on 2021/02/01 19:25:36 UTC
[lucene-solr] branch master updated: SOLR-14253 Replace sleep calls
with ZK waits (#1297)
This is an automated email from the ASF dual-hosted git repository.
mdrob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 9974838 SOLR-14253 Replace sleep calls with ZK waits (#1297)
9974838 is described below
commit 99748384cfb16cdef2c5a116243cddc23cedf11c
Author: Mike Drob <md...@apache.org>
AuthorDate: Mon Feb 1 13:25:17 2021 -0600
SOLR-14253 Replace sleep calls with ZK waits (#1297)
Co-Authored-By: markrmiller <ma...@apache.org>
---
.../java/org/apache/solr/cloud/ZkController.java | 74 ++++------
.../solr/cloud/api/collections/AddReplicaCmd.java | 2 +-
.../OverseerCollectionMessageHandler.java | 163 +++++++++------------
.../solr/cloud/LeaderElectionIntegrationTest.java | 36 +----
.../OverseerCollectionConfigSetProcessorTest.java | 6 +
.../apache/solr/core/TestSolrConfigHandler.java | 6 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 81 +++++-----
7 files changed, 150 insertions(+), 218 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index bf164bf..8a958d5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -59,6 +59,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.AlreadyClosedException;
@@ -1695,60 +1696,39 @@ public class ZkController implements Closeable {
}
private void waitForCoreNodeName(CoreDescriptor descriptor) {
- int retryCount = 320;
- log.debug("look for our core node name");
- while (retryCount-- > 0) {
- final DocCollection docCollection = zkStateReader.getClusterState()
- .getCollectionOrNull(descriptor.getCloudDescriptor().getCollectionName());
- if (docCollection != null && docCollection.getSlicesMap() != null) {
- final Map<String, Slice> slicesMap = docCollection.getSlicesMap();
- for (Slice slice : slicesMap.values()) {
- for (Replica replica : slice.getReplicas()) {
- // TODO: for really large clusters, we could 'index' on this
-
- String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
- String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String msgNodeName = getNodeName();
- String msgCore = descriptor.getName();
-
- if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
- descriptor.getCloudDescriptor()
- .setCoreNodeName(replica.getName());
- getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
- return;
- }
- }
- }
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ log.debug("waitForCoreNodeName >>> look for our core node name");
+ try {
+ zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, c -> {
+ String name = ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName());
+ if (name == null) return false;
+ descriptor.getCloudDescriptor().setCoreNodeName(name);
+ return true;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e);
}
+ getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
}
- private void waitForShardId(CoreDescriptor cd) {
+ private void waitForShardId(final CoreDescriptor cd) {
if (log.isDebugEnabled()) {
log.debug("waiting to find shard id in clusterstate for {}", cd.getName());
}
- int retryCount = 320;
- while (retryCount-- > 0) {
- final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
- if (shardId != null) {
- cd.getCloudDescriptor().setShardId(shardId);
- return;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ try {
+ zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, c -> {
+ if (c == null) return false;
+ final String shardId = c.getShardId(getNodeName(), cd.getName());
+ if (shardId != null) {
+ cd.getCloudDescriptor().setShardId(shardId);
+ return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e);
}
-
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not get shard id for core: " + cd.getName());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c3f5004..6e34b0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -219,7 +219,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
- ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
+ ocmh.waitToSeeReplicasInState(collectionName, Collections.singleton(createReplica.coreName)).get(createReplica.coreName).getName());
String configName = zkStateReader.readConfigName(collectionName);
String routeKey = message.getStr(ShardParams._ROUTE_);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 0ba12e0..6686d67 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
@@ -52,6 +53,7 @@ import org.apache.solr.cloud.OverseerNodePrioritizer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
@@ -85,8 +87,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.util.RTimer;
-import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -487,59 +487,34 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
- int retryCount = 320;
- while (retryCount-- > 0) {
- final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
- if (docCollection != null && docCollection.getSlicesMap() != null) {
- Map<String,Slice> slicesMap = docCollection.getSlicesMap();
- for (Slice slice : slicesMap.values()) {
- for (Replica replica : slice.getReplicas()) {
- // TODO: for really large clusters, we could 'index' on this
-
- String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
- String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
- if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
- return replica.getName();
- }
- }
+ AtomicReference<String> coreNodeName = new AtomicReference<>();
+ try {
+ zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
+ String name = ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore);
+ if (name == null) {
+ return false;
}
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ coreNodeName.set(name);
+ return true;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for coreNodeName", e);
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+ return coreNodeName.get();
}
ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
- RTimer timer = new RTimer();
- int retryCount = 320;
- while (retryCount-- > 0) {
- ClusterState clusterState = zkStateReader.getClusterState();
- DocCollection collection = clusterState.getCollection(collectionName);
-
- if (collection == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Unable to find collection: " + collectionName + " in clusterstate");
- }
- Slice slice = collection.getSlice(sliceName);
- if (slice != null) {
- if (log.isDebugEnabled()) {
- log.debug("Waited for {}ms for slice {} of collection {} to be available",
- timer.getTime(), sliceName, collectionName);
- }
- return clusterState;
- }
- Thread.sleep(1000);
+ try {
+ zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
+ return c != null && c.getSlice(sliceName) != null;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for new slice", e);
}
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not find new slice " + sliceName + " in collection " + collectionName
- + " even after waiting for " + timer.getTime() + "ms"
- );
+ return zkStateReader.getClusterState();
}
DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
@@ -592,33 +567,31 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
overseer.offerStateUpdate(Utils.toJSON(message));
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean areChangesVisible = true;
- while (!timeout.hasTimedOut()) {
- DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
- areChangesVisible = true;
- for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
- String updateKey = updateEntry.getKey();
-
- if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
- && !updateKey.equals(Overseer.QUEUE_OPERATION)
- && updateEntry.getValue() != null // handled below in a separate conditional
- && !updateEntry.getValue().equals(collection.get(updateKey))) {
- areChangesVisible = false;
- break;
- }
+ try {
+ zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, c -> {
+ if (c == null) return false;
+
+ for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
+ String updateKey = updateEntry.getKey();
- if (updateEntry.getValue() == null && collection.containsKey(updateKey)) {
- areChangesVisible = false;
- break;
+ if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
+ && !updateKey.equals(Overseer.QUEUE_OPERATION)
+ && updateEntry.getValue() != null // handled below in a separate conditional
+ && !updateEntry.getValue().equals(c.get(updateKey))) {
+ return false;
+ }
+ if (updateEntry.getValue() == null && c.containsKey(updateKey)) {
+ return false;
+ }
}
- }
- if (areChangesVisible) break;
- timeout.sleep(100);
- }
- if (!areChangesVisible)
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
+ return true;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ log.debug("modifyCollection(ClusterState={}, ZkNodeProps={}, NamedList={})", clusterState, message, results, e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to modify collection", e);
+ }
// if switching to/from read-only mode reload the collection
if (message.keySet().contains(ZkStateReader.READ_ONLY)) {
@@ -636,33 +609,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
assert coreNames.size() > 0;
- Map<String, Replica> result = new HashMap<>();
- TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
- while (true) {
- DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
- for (String coreName : coreNames) {
- if (result.containsKey(coreName)) continue;
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
- result.put(coreName, replica);
- break;
- }
- }
- }
- }
+ Map<String, Replica> results = new HashMap<>();
+ AtomicReference<DocCollection> lastState = new AtomicReference<>();
- if (result.size() == coreNames.size()) {
- return result;
- } else {
- log.debug("Expecting {} cores but found {}", coreNames, result);
- }
- if (timeout.hasTimedOut()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
- }
-
- Thread.sleep(100);
+ long maxWait = Long.getLong("solr.waitToSeeReplicasInStateTimeoutSeconds", 120); // could be a big cluster
+ try {
+ zkStateReader.waitForState(collectionName, maxWait, TimeUnit.SECONDS, c -> {
+ if (c == null) return false;
+
+ c.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
+ .filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
+ .filter(r -> !results.containsKey(r.getCoreName())) // ...but not the ones we've seen already...
+ .forEach(r -> results.put(r.getCoreName(), r)); // ...get added to the map
+
+ lastState.set(c);
+ log.debug("Expecting {} cores, found {}", coreNames, results);
+ return results.size() == coreNames.size();
+ });
+ } catch (TimeoutException e) {
+ String error = "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + lastState.get();
+ throw new SolrException(ErrorCode.SERVER_ERROR, error);
}
+
+ return results;
}
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
index a5964d4..35ffa69 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
@@ -66,51 +67,24 @@ public class LeaderElectionIntegrationTest extends SolrCloudTestCase {
String collection = "collection1";
createCollection(collection);
+ cluster.waitForActiveCollection(collection, 10, TimeUnit.SECONDS, 2, 6);
List<JettySolrRunner> stoppedRunners = new ArrayList<>();
for (int i = 0; i < 4; i++) {
// who is the leader?
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
assertNotNull(jetty);
- assertTrue("shard1".equals(jetty.getCoreContainer().getCores().iterator().next()
- .getCoreDescriptor().getCloudDescriptor().getShardId()));
+ assertEquals("shard1", jetty.getCoreContainer().getCores().iterator().next()
+ .getCoreDescriptor().getCloudDescriptor().getShardId());
jetty.stop();
stoppedRunners.add(jetty);
-
- // poll until leader change is visible
- for (int j = 0; j < 90; j++) {
- String currentLeader = getLeader(collection);
- if(!leader.equals(currentLeader)) {
- break;
- }
- Thread.sleep(500);
- }
-
- leader = getLeader(collection);
- int retry = 0;
- while (jetty == getRunner(leader)) {
- if (retry++ == 60) {
- break;
- }
- Thread.sleep(1000);
- }
-
- if (jetty == getRunner(leader)) {
- cluster.getZkClient().printLayoutToStream(System.out);
- fail("We didn't find a new leader! " + jetty + " was close, but it's still showing as the leader");
- }
-
- assertTrue("shard1".equals(getRunner(leader).getCoreContainer().getCores().iterator().next()
- .getCoreDescriptor().getCloudDescriptor().getShardId()));
}
for (JettySolrRunner runner : stoppedRunners) {
runner.start();
}
waitForState("Expected to see nodes come back " + collection, collection,
- (n, c) -> {
- return n.size() == 6;
- });
+ (n, c) -> n.size() == 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
// testLeaderElectionAfterClientTimeout
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 6127e59..40538c5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.http.client.HttpClient;
import org.apache.solr.SolrTestCaseJ4;
@@ -303,6 +304,11 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
when(zkStateReaderMock.getZkClient()).thenReturn(solrZkClientMock);
when(zkStateReaderMock.getClusterState()).thenReturn(clusterStateMock);
when(zkStateReaderMock.getAliases()).thenReturn(Aliases.EMPTY);
+ doAnswer(invocation -> {
+ Predicate<DocCollection> p = invocation.getArgument(3);
+ p.test(clusterStateMock.getCollection(invocation.getArgument(0)));
+ return null;
+ }).when(zkStateReaderMock).waitForState(anyString(), anyLong(), any(), any(Predicate.class));
when(clusterStateMock.getCollection(anyString())).thenAnswer(invocation -> {
String key = invocation.getArgument(0);
diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
index c673809..2244c21 100644
--- a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
+++ b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
@@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.LinkedHashMapWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.apache.solr.handler.DumpRequestHandler;
@@ -47,6 +48,7 @@ import org.apache.solr.util.RESTfulServerProvider;
import org.apache.solr.util.RestTestBase;
import org.apache.solr.util.RestTestHarness;
import org.apache.solr.util.SimplePostTool;
+import org.apache.solr.util.TimeOut;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Before;
@@ -613,10 +615,10 @@ public class TestSolrConfigHandler extends RestTestBase {
long maxTimeoutSeconds) throws Exception {
boolean success = false;
- long startTime = System.nanoTime();
LinkedHashMapWriter m = null;
- while (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
+ TimeOut timeOut = new TimeOut(maxTimeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (! timeOut.hasTimedOut()) {
try {
m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
} catch (Exception e) {
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index dc98422..7cd09a1 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -130,6 +130,7 @@ public class MiniSolrCloudCluster {
" \n" +
"</solr>\n";
+ private final Object startupWait = new Object();
private volatile ZkTestServer zkServer; // non-final due to injectChaos()
private final boolean externalZkServer;
private final List<JettySolrRunner> jettys = new CopyOnWriteArrayList<>();
@@ -330,46 +331,45 @@ public class MiniSolrCloudCluster {
private void waitForAllNodes(int numServers, int timeoutSeconds) throws IOException, InterruptedException, TimeoutException {
log.info("waitForAllNodes: numServers={}", numServers);
- int numRunning = 0;
+ int numRunning;
if (timeoutSeconds == 0) {
timeoutSeconds = DEFAULT_TIMEOUT;
}
TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-
- while (true) {
- if (timeout.hasTimedOut()) {
- throw new IllegalStateException("giving up waiting for all jetty instances to be running. numServers=" + numServers
- + " numRunning=" + numRunning);
- }
- numRunning = 0;
- for (JettySolrRunner jetty : getJettySolrRunners()) {
- if (jetty.isRunning()) {
- numRunning++;
+
+ synchronized (startupWait) {
+ while (numServers != (numRunning = numRunningJetty(getJettySolrRunners()))) {
+ if (timeout.hasTimedOut()) {
+ throw new IllegalStateException("giving up waiting for all jetty instances to be running. numServers=" + numServers
+ + " numRunning=" + numRunning);
}
+ startupWait.wait(500);
}
- if (numServers == numRunning) {
- break;
- }
- Thread.sleep(100);
}
-
- ZkStateReader reader = getSolrClient().getZkStateReader();
- for (JettySolrRunner jetty : getJettySolrRunners()) {
- reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
+ for (JettySolrRunner runner : getJettySolrRunners()) {
+ waitForNode(runner, (int) timeout.timeLeft(TimeUnit.SECONDS));
}
}
- public void waitForNode(JettySolrRunner jetty, int timeoutSeconds)
- throws IOException, InterruptedException, TimeoutException {
- if (log.isInfoEnabled()) {
- log.info("waitForNode: {}", jetty.getNodeName());
+ private int numRunningJetty(List<JettySolrRunner> runners) {
+ int numRunning = 0;
+ for (JettySolrRunner jsr : runners) {
+ if (jsr.isRunning()) numRunning++;
}
+ return numRunning;
+ }
- ZkStateReader reader = getSolrClient().getZkStateReader();
+ public void waitForNode(JettySolrRunner jetty, int timeoutSeconds) throws InterruptedException, TimeoutException {
+ String nodeName = jetty.getNodeName();
+ if (nodeName == null) {
+ throw new IllegalArgumentException("Cannot wait for Jetty with null node name");
+ }
+ log.info("waitForNode: {}", nodeName);
- reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
+ ZkStateReader reader = getSolrClient().getZkStateReader();
+ reader.waitForLiveNodes(timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null && n.contains(nodeName));
}
/**
@@ -483,6 +483,9 @@ public class MiniSolrCloudCluster {
: new JettySolrRunnerWithMetrics(runnerPath.toString(), nodeProps, newConfig);
jetty.start();
jettys.add(jetty);
+ synchronized (startupWait) {
+ startupWait.notifyAll();
+ }
return jetty;
}
@@ -809,22 +812,20 @@ public class MiniSolrCloudCluster {
}
public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
- if (log.isInfoEnabled()) {
- log.info("waitForJettyToStop: {}", runner.getLocalPort());
+ String nodeName = runner.getNodeName();
+ if (nodeName == null) {
+ log.info("Cannot wait for Jetty with null node name");
+ return;
}
- TimeOut timeout = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while(!timeout.hasTimedOut()) {
- if (runner.isStopped()) {
- break;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- if (timeout.hasTimedOut()) {
- throw new TimeoutException("Waiting for Jetty to stop timed out");
+
+ log.info("waitForJettyToStop: {}", nodeName);
+
+ ZkStateReader reader = getSolrClient().getZkStateReader();
+ try {
+ reader.waitForLiveNodes(15, TimeUnit.SECONDS, (o, n) -> ! n.contains(nodeName));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted", e);
}
}