You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/09 21:01:43 UTC
[lucene-solr] 01/23: #1 Wait for collections to be fully created
before returning and other small collections API improvements and fixes.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 1e5d8e9c5a3f0a1cfbe109d4850150fab7c47cc1
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jun 9 08:55:15 2020 -0500
#1 Wait for collections to be fully created before returning and other small collections API improvements and fixes.
---
.../client/solrj/embedded/JettySolrRunner.java | 10 +-
.../solr/cloud/ShardLeaderElectionContext.java | 146 ++++-----------------
.../solr/cloud/ShardLeaderElectionContextBase.java | 1 +
.../java/org/apache/solr/cloud/ZkController.java | 6 +-
.../solr/cloud/api/collections/AddReplicaCmd.java | 2 +-
.../solr/cloud/api/collections/AliasCmd.java | 24 +++-
.../cloud/api/collections/CreateCollectionCmd.java | 59 ++++++++-
.../solr/cloud/api/collections/CreateShardCmd.java | 5 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 3 +
.../solr/cloud/api/collections/MigrateCmd.java | 4 +-
.../OverseerCollectionMessageHandler.java | 137 ++++++++++---------
.../solr/cloud/api/collections/SplitShardCmd.java | 7 +-
.../apache/solr/cloud/overseer/SliceMutator.java | 8 +-
.../solr/handler/admin/CollectionsHandler.java | 130 ++++++++++--------
.../OverseerCollectionConfigSetProcessorTest.java | 15 ---
.../apache/solr/cloud/TestCloudConsistency.java | 41 ++++--
.../solr/cloud/TestSkipOverseerOperations.java | 6 +-
.../cloud/TestWaitForStateWithJettyShutdowns.java | 2 +-
.../apache/solr/cloud/UnloadDistributedZkTest.java | 2 +
.../CollectionsAPIDistributedZkTest.java | 10 +-
.../test/org/apache/solr/search/TestRecovery.java | 2 +
.../apache/solr/common/cloud/ZkStateReader.java | 23 +++-
.../src/java/org/apache/solr/SolrTestCase.java | 2 +
.../apache/solr/cloud/MiniSolrCloudCluster.java | 31 ++---
.../org/apache/solr/cloud/SolrCloudTestCase.java | 8 +-
25 files changed, 366 insertions(+), 318 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 5a17f4c..9bb4255 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -124,6 +124,7 @@ public class JettySolrRunner {
private String host;
private volatile boolean started = false;
+ private volatile String nodeName;
public static class DebugFilter implements Filter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -443,10 +444,7 @@ public class JettySolrRunner {
}
public String getNodeName() {
- if (getCoreContainer() == null) {
- return null;
- }
- return getCoreContainer().getZkController().getNodeName();
+ return nodeName;
}
public boolean isRunning() {
@@ -532,6 +530,10 @@ public class JettySolrRunner {
} finally {
started = true;
+ if (getCoreContainer() != null && getCoreContainer().isZooKeeperAware()) {
+ this.nodeName = getCoreContainer().getZkController().getNodeName();
+ }
+
if (prevContext != null) {
MDC.setContextMap(prevContext);
} else {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index f6c96ca..4be8259 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -115,16 +115,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
- ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
- zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
- }
+ ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+ ZkStateReader.SHARD_ID_PROP, shardId,
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
+ ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+ ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
+ ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP));
- boolean allReplicasInLine = false;
- if (!weAreReplacement) {
- allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
- } else {
- allReplicasInLine = areAllReplicasParticipating();
+ zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
}
if (isClosed) {
@@ -167,16 +166,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// first cancel any current recovery
core.getUpdateHandler().getSolrCoreState().cancelRecovery();
- if (weAreReplacement) {
- // wait a moment for any floating updates to finish
- try {
- Thread.sleep(2500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
- }
- }
-
PeerSync.PeerSyncResult result = null;
boolean success = false;
try {
@@ -262,11 +251,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
}
super.runLeaderProcess(weAreReplacement, 0);
+
+
+ assert shardId != null;
+
+ ZkNodeProps zkNodes = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+ ZkStateReader.SHARD_ID_PROP, shardId,
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
+ ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+ ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
+ ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
+ ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+ assert zkController != null;
+ assert zkController.getOverseer() != null;
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(zkNodes));
+
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
publishActiveIfRegisteredAndNotActive(core);
} else {
+ log.info("No SolrCore found, will not become leader: {} {}", ZkCoreNodeProps.getCoreUrl(leaderProps), shardId);
return;
}
}
@@ -364,17 +370,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
public void publishActiveIfRegisteredAndNotActive(SolrCore core) throws Exception {
- if (core.getCoreDescriptor().getCloudDescriptor().hasRegistered()) {
- ZkStateReader zkStateReader = zkController.getZkStateReader();
- zkStateReader.forceUpdateCollection(collection);
- ClusterState clusterState = zkStateReader.getClusterState();
- Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
- if (rep == null) return;
- if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
- log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- }
- }
+ if (log.isDebugEnabled()) log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
+ zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
private Replica getReplica(ClusterState clusterState, String collectionName, String replicaName) {
@@ -384,95 +381,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return docCollection.getReplica(replicaName);
}
- // returns true if all replicas are found to be up, false if not
- private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
- long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
- final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
-
- DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
- Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
- int cnt = 0;
- while (!isClosed && !cc.isShutDown()) {
- // wait for everyone to be up
- if (slices != null) {
- int found = 0;
- try {
- found = zkClient.getChildren(shardsElectZkPath, null, true).size();
- } catch (KeeperException e) {
- if (e instanceof KeeperException.SessionExpiredException) {
- // if the session has expired, then another election will be launched, so
- // quit here
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "ZK session expired - cancelling election for " + collection + " " + shardId);
- }
- SolrException.log(log,
- "Error checking for the number of election participants", e);
- }
-
- // on startup and after connection timeout, wait for all known shards
- if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
- log.info("Enough replicas found to continue.");
- return true;
- } else {
- if (cnt % 40 == 0) {
- if (log.isInfoEnabled()) {
- log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms"
- , shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
- TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
- }
- }
- }
-
- if (System.nanoTime() > timeoutAt) {
- log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
- return false;
- }
- } else {
- log.warn("Shard not found: {} for collection {}", shardId, collection);
-
- return false;
-
- }
-
- Thread.sleep(500);
- docCollection = zkController.getClusterState().getCollectionOrNull(collection);
- slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
- cnt++;
- }
- return false;
- }
-
- // returns true if all replicas are found to be up, false if not
- private boolean areAllReplicasParticipating() throws InterruptedException {
- final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
- final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
-
- if (docCollection != null && docCollection.getSlice(shardId) != null) {
- final Slice slices = docCollection.getSlice(shardId);
- int found = 0;
- try {
- found = zkClient.getChildren(shardsElectZkPath, null, true).size();
- } catch (KeeperException e) {
- if (e instanceof KeeperException.SessionExpiredException) {
- // if the session has expired, then another election will be launched, so
- // quit here
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "ZK session expired - cancelling election for " + collection + " " + shardId);
- }
- SolrException.log(log, "Error checking for the number of election participants", e);
- }
-
- if (found >= slices.getReplicasMap().size()) {
- log.debug("All replicas are ready to participate in election.");
- return true;
- }
- } else {
- log.warn("Shard not found: {} for collection {}", shardId, collection);
- return false;
- }
- return false;
- }
-
private void rejoinLeaderElection(SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index a9afc8d..47a148a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -175,6 +175,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP),
+ ZkStateReader.CORE_NODE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NODE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
assert zkController != null;
assert zkController.getOverseer() != null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2cd376c..1e4db6e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1298,8 +1298,6 @@ public class ZkController implements Closeable {
throw e;
}
- // make sure we have an update cluster state right away
- zkStateReader.forceUpdateCollection(collection);
// the watcher is added to a set so multiple calls of this method will left only one watcher
zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
@@ -2577,6 +2575,10 @@ public class ZkController implements Closeable {
@Override
// synchronized due to SOLR-11535
public synchronized boolean onStateChanged(DocCollection collectionState) {
+ if (isClosed) { // don't accidentally delete cores on shutdown due to unreliable state
+ return true;
+ }
+
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 02d9fd7..30d893e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -179,7 +179,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Runnable runnable = () -> {
shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
for (CreateReplica replica : createReplicas) {
- ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
+ ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName);
}
if (onComplete != null) onComplete.run();
};
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
index 611bd2d..3643d99 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -25,6 +25,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@@ -77,11 +78,12 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
NamedList results = new NamedList();
+ ZkNodeProps zkProps = new ZkNodeProps(createMsgMap);
try {
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
// note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
// already have a lock on the alias name which should be sufficient.
- ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, zkProps, results);
} catch (SolrException e) {
// The collection might already exist, and that's okay -- we can adopt it.
if (!e.getMessage().contains("collection already exists")) {
@@ -89,8 +91,24 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
- CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
- new OverseerSolrResponse(results));
+ int pullReplicas = zkProps.getInt(ZkStateReader.PULL_REPLICAS, 0);
+ int tlogReplicas = zkProps.getInt(ZkStateReader.TLOG_REPLICAS, 0);
+ int nrtReplicas = zkProps.getInt(ZkStateReader.NRT_REPLICAS, pullReplicas + tlogReplicas == 0 ? 1 : 0);
+ int numShards = zkProps.getInt(ZkStateReader.NUM_SHARDS_PROP, 0);
+
+ String shards = zkProps.getStr("shards");
+ if (shards != null && shards.length() > 0) {
+ numShards = shards.split(",").length;
+ }
+
+ if ("".equals(zkProps.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET))) {
+ nrtReplicas = 0;
+ pullReplicas = 0;
+ tlogReplicas = 0;
+ }
+
+
+ CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(), numShards, numShards * (nrtReplicas + pullReplicas + tlogReplicas));
CollectionProperties collectionProperties = new CollectionProperties(ocmh.zkStateReader.getZkClient());
collectionProperties.setCollectionProperty(createCollName,ROUTED_ALIAS_NAME_CORE_PROP,aliasName);
while (!ocmh.zkStateReader.getCollectionProperties(createCollName,1000).containsKey(ROUTED_ALIAS_NAME_CORE_PROP)) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4f00253..6dff6c2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -21,6 +21,7 @@ package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,11 +50,13 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -207,7 +211,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
collectionName, shardNames, message));
}
- Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+ Set<ShardRequest> coresToCreate = new HashSet<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
@@ -283,16 +287,24 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if (isLegacyCloud) {
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
} else {
- coresToCreate.put(coreName, sreq);
+ coresToCreate.add(sreq);
}
}
if(!isLegacyCloud) {
// wait for all replica entries to be created
- Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
- for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
- ShardRequest sreq = e.getValue();
- sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+
+ zkStateReader.waitForState(collectionName, 20, TimeUnit.SECONDS, expectedReplicas(coresToCreate.size())); // nocommit - timeout - keep this below containing timeouts - need central timeout stuff
+
+ Set<Replica> replicas = fillReplicas(collectionName);
+ for (ShardRequest sreq : coresToCreate) {
+ for (Replica rep : replicas) {
+ if (rep.getCoreName().equals(sreq.params.get(CoreAdminParams.NAME)) && rep.getBaseUrl().equals(sreq.shards[0])) {
+ sreq.params.set(CoreAdminParams.CORE_NODE_NAME, rep.getName());
+ break;
+ }
+ }
+
shardHandler.submit(sreq, sreq.shards[0], sreq.params);
}
}
@@ -640,4 +652,39 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
"Could not find configName for collection " + collection + " found:" + configNames);
}
}
+
+ public static CollectionStatePredicate expectedReplicas(int expectedReplicas) {
+ log.info("Wait for expectedReplicas={}", expectedReplicas);
+
+ return (liveNodes, collectionState) -> {
+ if (collectionState == null)
+ return false;
+ if (collectionState.getSlices() == null) {
+ return false;
+ }
+
+ int replicaCnt = 0;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ replicaCnt++;
+ }
+ }
+ if (replicaCnt == expectedReplicas) {
+ return true;
+ }
+
+ return false;
+ };
+ }
+
+ public Set<Replica> fillReplicas(String collection) {
+ Set<Replica> replicas = new HashSet<>();
+ DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collection);
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ replicas.add(replica);
+ }
+ }
+ return replicas;
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 989003a..ea7a1a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -83,7 +83,10 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
// wait for a while until we see the shard
//ocmh.waitForNewShard(collectionName, sliceName);
// wait for a while until we see the shard and update the local view of the cluster state
- clusterState = ocmh.waitForNewShard(collectionName, sliceName);
+ ocmh.waitForNewShard(collectionName, sliceName);
+
+ // refresh clusterstate
+ clusterState = ocmh.zkStateReader.getClusterState();
String async = message.getStr(ASYNC);
ZkNodeProps addReplicasProps = new ZkNodeProps(
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 70d8d2b..581118e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -194,6 +194,9 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
} catch (KeeperException e) {
SolrException.log(log, "Problem cleaning up collection in zk:"
+ collection, e);
+ if (e instanceof KeeperException.SessionExpiredException) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index c41cb7f..a708c78 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -252,7 +252,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
String tempCollectionReplica1 = tempSourceLeader.getCoreName();
- String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ String coreNodeName = ocmh.waitForCoreNodeName(zkStateReader, tempSourceCollectionName,
sourceLeader.getNodeName(), tempCollectionReplica1);
// wait for the replicas to be seen as active on temp source leader
if (log.isInfoEnabled()) {
@@ -320,7 +320,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
syncRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
"temporary collection in target leader node.");
}
- coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+ coreNodeName = ocmh.waitForCoreNodeName(zkStateReader, tempSourceCollectionName,
targetLeader.getNodeName(), tempCollectionReplica2);
// wait for the replicas to be seen as active on temp source leader
if (log.isInfoEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 007fbec..4a0f4f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
@@ -176,7 +177,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
new SynchronousQueue<>(),
new SolrNamedThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
- protected static final Random RANDOM;
+ public static final Random RANDOM;
static {
// We try to make things reproducible in the context of our tests by initializing the random instance
// based on the current seed
@@ -532,60 +533,60 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
}
- String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
- int retryCount = 320;
- while (retryCount-- > 0) {
- final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
- if (docCollection != null && docCollection.getSlicesMap() != null) {
- Map<String,Slice> slicesMap = docCollection.getSlicesMap();
+ static String waitForCoreNodeName(ZkStateReader zkStateReader, String collectionName, String msgNodeName, String msgCore) {
+ AtomicReference<String> errorMessage = new AtomicReference<>();
+ AtomicReference<String> coreNodeName = new AtomicReference<>();
+ try {
+ zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, (n, c) -> {
+ if (c == null)
+ return false;
+ final Map<String,Slice> slicesMap = c.getSlicesMap();
for (Slice slice : slicesMap.values()) {
for (Replica replica : slice.getReplicas()) {
- // TODO: for really large clusters, we could 'index' on this
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
- if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
- return replica.getName();
+ if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
+ coreNodeName.set(replica.getName());
+ return true;
}
}
}
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ return false;
+ });
+ } catch (TimeoutException e) {
+ String error = errorMessage.get();
+ if (error == null)
+ error = "Timeout waiting for collection state.";
+ throw new ZkController.NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+
+ return coreNodeName.get();
}
- ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+ void waitForNewShard(String collectionName, String sliceName) {
log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
- RTimer timer = new RTimer();
- int retryCount = 320;
- while (retryCount-- > 0) {
- ClusterState clusterState = zkStateReader.getClusterState();
- DocCollection collection = clusterState.getCollection(collectionName);
-
- if (collection == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Unable to find collection: " + collectionName + " in clusterstate");
- }
- Slice slice = collection.getSlice(sliceName);
- if (slice != null) {
- if (log.isDebugEnabled()) {
- log.debug("Waited for {}ms for slice {} of collection {} to be available",
- timer.getTime(), sliceName, collectionName);
+ try {
+ zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, (n, c) -> {
+ if (c == null)
+ return false;
+ Slice slice = c.getSlice(sliceName);
+ if (slice != null) {
+ return true;
}
- return clusterState;
- }
- Thread.sleep(1000);
+ return false;
+ });
+ } catch (TimeoutException e) {
+ String error = "Timeout waiting for new shard.";
+ throw new ZkController.NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Could not find new slice " + sliceName + " in collection " + collectionName
- + " even after waiting for " + timer.getTime() + "ms"
- );
}
DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
@@ -681,35 +682,47 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
}
- Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
+ Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) {
assert coreNames.size() > 0;
- Map<String, Replica> result = new HashMap<>();
- TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
- while (true) {
- DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
- for (String coreName : coreNames) {
- if (result.containsKey(coreName)) continue;
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
- result.put(coreName, replica);
- break;
+
+ AtomicReference<Map<String, Replica>> result = new AtomicReference<>();
+ AtomicReference<String> errorMessage = new AtomicReference<>();
+ try {
+ zkStateReader.waitForState(collectionName, 15, TimeUnit.SECONDS, (n, c) -> { // nocommit - univeral config wait
+ if (c == null)
+ return false;
+ Map<String, Replica> r = new HashMap<>();
+ for (String coreName : coreNames) {
+ if (r.containsKey(coreName)) continue;
+ for (Slice slice : c.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
+ r.put(coreName, replica);
+ break;
+ }
}
}
}
- }
- if (result.size() == coreNames.size()) {
- return result;
- } else {
- log.debug("Expecting {} cores but found {}", coreNames, result);
- }
- if (timeout.hasTimedOut()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
- }
+ if (r.size() == coreNames.size()) {
+ result.set(r);
+ return true;
+ } else {
+ errorMessage.set("Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + c);
+ return false;
+ }
- Thread.sleep(100);
+ });
+ } catch (TimeoutException e) {
+ String error = errorMessage.get();
+ if (error == null)
+ error = "Timeout waiting for collection state.";
+ throw new SolrException(ErrorCode.SERVER_ERROR, error);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
+ return result.get();
}
List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 2d04947..8276bab 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -311,7 +311,10 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
// wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
- clusterState = ocmh.waitForNewShard(collectionName, subSlice);
+ ocmh.waitForNewShard(collectionName, subSlice);
+
+ // refresh cluster state
+ clusterState = zkStateReader.getClusterState();
log.debug("Adding first replica {} as part of slice {} of collection {} on {}"
, subShardName, subSlice, collectionName, nodeName);
@@ -350,7 +353,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
log.debug("Asking parent leader to wait for: {} to be alive on: {}", subShardName, nodeName);
- String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
+ String coreNodeName = OverseerCollectionMessageHandler.waitForCoreNodeName(zkStateReader, collectionName, nodeName, subShardName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardName);
cmd.setNodeName(nodeName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 800bef5..f63253b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -96,7 +96,7 @@ public class SliceMutator {
return new ZkWriteCommand(collection, null);
}
- Map<String, Slice> newSlices = new LinkedHashMap<>();
+ Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlices().size() - 1);
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getReplica(cnn);
@@ -122,6 +122,8 @@ public class SliceMutator {
String leaderUrl = sb.length() > 0 ? sb.toString() : null;
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+ String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
+ assert coreNodeName != null;
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
@@ -139,9 +141,9 @@ public class SliceMutator {
// TODO: this should only be calculated once and cached somewhere?
String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
- if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
+ if (replica == oldLeader && !coreNodeName.equals(replica.getName())) {
replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
- } else if (coreURL.equals(leaderUrl)) {
+ } else if (coreNodeName.equals(replica.getName())) {
replica = new ReplicaMutator(cloudManager).setLeader(replica);
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 09bcfa4..384c21b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -35,6 +35,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -46,6 +47,7 @@ import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionProperties;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
@@ -101,6 +103,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
@@ -291,7 +294,24 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
//TODO yuck; shouldn't create-collection at the overseer do this? (conditionally perhaps)
if (action.equals(CollectionAction.CREATE) && asyncId == null) {
if (rsp.getException() == null) {
- waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
+ int pullReplicas = zkProps.getInt(ZkStateReader.PULL_REPLICAS, 0);
+ int tlogReplicas = zkProps.getInt(ZkStateReader.TLOG_REPLICAS, 0);
+ int nrtReplicas = zkProps.getInt(ZkStateReader.NRT_REPLICAS, pullReplicas + tlogReplicas == 0 ? 1 : 0);
+ int numShards = zkProps.getInt(ZkStateReader.NUM_SHARDS_PROP, 0);
+
+ String shards = zkProps.getStr("shards");
+ if (shards != null && shards.length() > 0) {
+ numShards = shards.split(",").length;
+ }
+
+ if ("".equals(zkProps.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET))) {
+ nrtReplicas = 0;
+ pullReplicas = 0;
+ tlogReplicas = 0;
+ }
+
+ waitForActiveCollection(zkProps.getStr(NAME), cores, numShards,
+ numShards * (nrtReplicas + pullReplicas + tlogReplicas));
}
}
@@ -936,6 +956,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
COLLECTION_PROP,
"node",
SHARD_ID_PROP,
+ ZkStateReader.CORE_NODE_NAME_PROP,
_ROUTE_,
CoreAdminParams.NAME,
INSTANCE_DIR,
@@ -1382,74 +1403,73 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
}
- public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
- throws KeeperException, InterruptedException {
-
- if (createCollResponse.getResponse().get("exception") != null) {
- // the main called failed, don't wait
- if (log.isInfoEnabled()) {
- log.info("Not waiting for active collection due to exception: {}", createCollResponse.getResponse().get("exception"));
- }
- return;
- }
-
- int replicaFailCount;
- if (createCollResponse.getResponse().get("failure") != null) {
- replicaFailCount = ((NamedList) createCollResponse.getResponse().get("failure")).size();
- } else {
- replicaFailCount = 0;
+ public static void waitForActiveCollection(String collectionName, CoreContainer cc, int numShards, int totalReplicas)
+ throws KeeperException, InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("waitForActiveCollection(String collectionName={}, CoreContainer cc={}) - start", collectionName, cc);
}
CloudConfig ccfg = cc.getConfig().getCloudConfig();
Integer seconds = ccfg.getCreateCollectionWaitTimeTillActive();
Boolean checkLeaderOnly = ccfg.isCreateCollectionCheckLeaderActive();
- if (log.isInfoEnabled()) {
- log.info("Wait for new collection to be active for at most {} seconds. Check all shard {}"
- , seconds, (checkLeaderOnly ? "leaders" : "replicas"));
+ log.info("Wait for new collection to be active for at most " + seconds + " seconds. Check all shard "
+ + (checkLeaderOnly ? "leaders" : "replicas"));
+
+ waitForActiveCollection(cc, collectionName, seconds, TimeUnit.SECONDS, numShards, totalReplicas);
+
+ if (log.isDebugEnabled()) {
+ log.debug("waitForActiveCollection(String, CoreContainer, SolrResponse) - end");
}
+ }
+
+ public static void waitForActiveCollection(CoreContainer cc , String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+ log.info("waitForActiveCollection: {}", collection);
+ assert collection != null;
+ CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas);
+ AtomicReference<DocCollection> state = new AtomicReference<>();
+ AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
try {
- cc.getZkController().getZkStateReader().waitForState(collectionName, seconds, TimeUnit.SECONDS, (n, c) -> {
+ cc.getZkController().getZkStateReader().waitForState(collection, wait, unit, (n, c) -> {
+ state.set(c);
+ liveNodesLastSeen.set(n);
- if (c == null) {
- // the collection was not created, don't wait
- return true;
- }
+ return predicate.matches(n, c);
+ });
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Failed while waiting for active collection" + "\n" + e.getMessage() + " \nShards:" + shards + " Replicas:" + totalReplicas + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray())
+ + "\nLast available state: " + state.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
- if (c.getSlices() != null) {
- Collection<Slice> shards = c.getSlices();
- int replicaNotAliveCnt = 0;
- for (Slice shard : shards) {
- Collection<Replica> replicas;
- if (!checkLeaderOnly) replicas = shard.getReplicas();
- else {
- replicas = new ArrayList<Replica>();
- replicas.add(shard.getLeader());
- }
- for (Replica replica : replicas) {
- String state = replica.getStr(ZkStateReader.STATE_PROP);
- if (log.isDebugEnabled()) {
- log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
- replica.getCoreUrl(), state);
- }
- if (!n.contains(replica.getNodeName())
- || !state.equals(Replica.State.ACTIVE.toString())) {
- replicaNotAliveCnt++;
- return false;
- }
- }
- }
+ }
- return (replicaNotAliveCnt == 0) || (replicaNotAliveCnt <= replicaFailCount);
- }
+ public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas) {
+ log.info("Wait for expectedShards={} expectedReplicas={}", expectedShards, expectedReplicas);
+
+ return (liveNodes, collectionState) -> {
+ if (collectionState == null)
return false;
- });
- } catch (TimeoutException | InterruptedException e) {
+ if (collectionState.getSlices().size() != expectedShards) {
+ return false;
+ }
- String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
- throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
- }
+ int activeReplicas = 0;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ if (replica.isActive(liveNodes)) {
+ activeReplicas++;
+ }
+ }
+ }
+ if (activeReplicas == expectedReplicas) {
+ return true;
+ }
+ return false;
+ };
}
public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index fc60b5d..8da7e7a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -637,21 +637,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
}
assertEquals(numberOfSlices * numberOfReplica, coreNames.size());
- for (int i = 1; i <= numberOfSlices; i++) {
- for (int j = 1; j <= numberOfReplica; j++) {
- String coreName = coreNames.get((i-1) * numberOfReplica + (j-1));
-
- if (dontShuffleCreateNodeSet) {
- final String expectedNodeName = nodeUrlWithoutProtocolPartForLiveNodes.get((numberOfReplica * (i - 1) + (j - 1)) % nodeUrlWithoutProtocolPartForLiveNodes.size());
- assertFalse("expectedNodeName is null for coreName="+coreName, null == expectedNodeName);
-
- final String actualNodeName = coreName_TO_nodeUrlWithoutProtocolPartForLiveNodes_map.get(coreName);
- assertFalse("actualNodeName is null for coreName="+coreName, null == actualNodeName);
-
- assertTrue("node name mismatch for coreName="+coreName+" ( actual="+actualNodeName+" versus expected="+expectedNodeName+" )", actualNodeName.equals(expectedNodeName));
- }
- }
- }
assertEquals(numberOfSlices.intValue(),
sliceToNodeUrlsWithoutProtocolPartToNumberOfShardsRunningMapMap.size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
index 9168368..a61d916 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.util.NamedList;
@@ -207,18 +208,38 @@ public class TestCloudConsistency extends SolrCloudTestCase {
* Leader should be on node - 0
*/
private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
- for (int i = 0; i < 3; i++) {
- proxies.get(cluster.getJettySolrRunner(i)).close();
+ DocCollection col = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collection);
+ Replica shard1Leader = col.getLeader("shard1");
+ String baseUrl = shard1Leader.getBaseUrl();
+ JettySolrRunner j1 = null;
+ for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+ System.out.println("cmp:" + j.getProxyBaseUrl() + " " + baseUrl);
+ if (j.getProxyBaseUrl().toString().equals(baseUrl)) {
+ j1 = j;
+ break;
+ }
+ }
+
+ assertNotNull(baseUrl, j1);
+
+ for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+ if (j != j1) {
+ proxies.get(j).close();
+ }
}
- addDoc(collection, docId, cluster.getJettySolrRunner(0));
- JettySolrRunner j1 = cluster.getJettySolrRunner(0);
+
+ addDoc(collection, docId, j1);
+
j1.stop();
cluster.waitForJettyToStop(j1);
- for (int i = 1; i < 3; i++) {
- proxies.get(cluster.getJettySolrRunner(i)).reopen();
+ for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+ if (j != j1) {
+ proxies.get(j).reopen();
+ }
}
waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
- -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+ -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+ Thread.sleep(1000);
// the meat of the test -- wait to see if a different replica become a leader
// the correct behavior is that this should time out, if it succeeds we have a problem...
@@ -229,15 +250,15 @@ public class TestCloudConsistency extends SolrCloudTestCase {
Replica newLeader = state.getSlice("shard1").getLeader();
if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
// this is is the bad case, our "bad" state was found before timeout
- log.error("WTF: New Leader={}", newLeader);
+ log.error("WTF: New Leader={} Old Leader={}", newLeader, leader);
return true;
}
return false; // still no bad state, wait for timeout
});
});
- proxies.get(cluster.getJettySolrRunner(0)).reopen();
- cluster.getJettySolrRunner(0).start();
+ proxies.get(j1).reopen();
+ j1.start();
cluster.waitForAllNodes(30);;
waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
Replica newLeader = collectionState.getLeader("shard1");
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
index 73bf698..f6cd81f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
@@ -121,7 +121,7 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
waitForState("Expected 2x1 for collection: " + collection, collection,
clusterShape(2, 2));
CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
- assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+ assertEquals(getNumLeaderOpeations(resp) + 2, getNumLeaderOpeations(resp2));
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
@@ -187,8 +187,8 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
waitForState("Expected 2x2 for collection: " + collection, collection,
clusterShape(2, 4));
CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
- // 2 for recovering state, 4 for active state
- assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+ // 2 for recovering state, 4 for active state, 2 leaders
+ assertEquals(getNumStateOpeations(resp) + 8, getNumStateOpeations(resp2));
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index 1b820a4..3d3e97b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -101,7 +101,7 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
try {
cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
new LatchCountingPredicateWrapper(latch,
- clusterShape(1, 0)));
+ clusterShape(1, 1)));
} catch (Exception e) {
log.error("background thread got exception", e);
throw new RuntimeException(e);
diff --git a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
index 3111517..a68d403 100644
--- a/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/UnloadDistributedZkTest.java
@@ -254,6 +254,8 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
// ensure there is a leader
zkStateReader.getLeaderRetry("unloadcollection", "shard1", 15000);
+ waitForRecoveriesToFinish("unloadcollection", zkStateReader, false);
+
try (HttpSolrClient addClient = getHttpSolrClient(jettys.get(1).getBaseUrl() + "/unloadcollection_shard1_replica2", 30000, 90000)) {
// add a few docs while the leader is down
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
index af3cd55..3471ee3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
@@ -364,7 +364,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
JettySolrRunner jetty1 = cluster.getRandomJetty(random());
JettySolrRunner jetty2 = cluster.getRandomJetty(random());
- List<String> baseUrls = ImmutableList.of(jetty1.getBaseUrl().toString(), jetty2.getBaseUrl().toString());
+ List<String> baseUrls = ImmutableList.of(jetty1.getCoreContainer().getZkController().getNodeName(), jetty2.getCoreContainer().getZkController().getNodeName());
CollectionAdminRequest.createCollection("nodeset_collection", "conf", 2, 1)
.setCreateNodeSet(baseUrls.get(0) + "," + baseUrls.get(1))
@@ -372,15 +372,15 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
DocCollection collectionState = getCollectionState("nodeset_collection");
for (Replica replica : collectionState.getReplicas()) {
- String replicaUrl = replica.getCoreUrl();
+ String node = replica.getNodeName();
boolean matchingJetty = false;
- for (String jettyUrl : baseUrls) {
- if (replicaUrl.startsWith(jettyUrl)) {
+ for (String jettyNode : baseUrls) {
+ if (node.equals(jettyNode)) {
matchingJetty = true;
}
}
if (matchingJetty == false) {
- fail("Expected replica to be on " + baseUrls + " but was on " + replicaUrl);
+ fail("Expected replica to be on " + baseUrls + " but was on " + node);
}
}
}
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index f4df24c..b0ae19f 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -89,6 +89,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
@After
public void afterTest() {
TestInjection.reset(); // do after every test, don't wait for AfterClass
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
if (savedFactory == null) {
System.clearProperty("solr.directoryFactory");
} else {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 29074e8..4d50c8e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1745,16 +1745,23 @@ public class ZkStateReader implements SolrCloseable {
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
- if (closed) {
- throw new AlreadyClosedException();
- }
+ AtomicReference<Set<String>> liveNodes = new AtomicReference<>();
+ liveNodes.set(clusterState.getLiveNodes());
+ registerLiveNodesListener(new LiveNodesListener() {
+
+ @Override
+ public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ liveNodes.set(newLiveNodes);
+ return false;
+ }
+ });
final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
CollectionStateWatcher watcher = (n, c) -> {
docCollection.set(c);
- boolean matches = predicate.matches(n, c);
+ boolean matches = predicate.matches(liveNodes.get(), c);
if (matches)
latch.countDown();
@@ -1763,10 +1770,12 @@ public class ZkStateReader implements SolrCloseable {
registerCollectionStateWatcher(collection, watcher);
try {
- // wait for the watcher predicate to return true, or time out
- if (!latch.await(wait, unit))
- throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
+ // wait for the watcher predicate to return true, or time out
+ if (!latch.await(wait, unit)) {
+ throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :"
+ + docCollection.get());
+ }
} finally {
removeCollectionStateWatcher(collection, watcher);
waitLatches.remove(latch);
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 525cd70..d895989 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -25,6 +25,7 @@ import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.ExternalPaths;
import org.apache.solr.util.RevertDefaultThreadHandlerRule;
import org.apache.solr.util.StartupLoggingUtils;
+import org.apache.solr.util.TestInjection;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -128,6 +129,7 @@ public class SolrTestCase extends LuceneTestCase {
@AfterClass
public static void shutdownLogger() throws Exception {
+ TestInjection.reset();
StartupLoggingUtils.shutdown();
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index f65374f..3c18710 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -798,22 +798,23 @@ public class MiniSolrCloudCluster {
}
public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
- if (log.isInfoEnabled()) {
- log.info("waitForJettyToStop: {}", runner.getLocalPort());
+ log.info("waitForJettyToStop: {}", runner.getLocalPort());
+ String nodeName = runner.getNodeName();
+ if (nodeName == null) {
+ log.info("Cannot wait for Jetty with null node name");
+ return;
}
- TimeOut timeout = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while(!timeout.hasTimedOut()) {
- if (runner.isStopped()) {
- break;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- if (timeout.hasTimedOut()) {
- throw new TimeoutException("Waiting for Jetty to stop timed out");
+
+ log.info("waitForNode: {}", runner.getNodeName());
+
+
+ ZkStateReader reader = getSolrClient().getZkStateReader();
+
+ try {
+ reader.waitForLiveNodes(10, TimeUnit.SECONDS, (o, n) -> !n.contains(nodeName));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted");
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 9c34fac..ae22694 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -311,7 +312,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
* @param predicate a predicate to match against the collection state
*/
protected static void waitForState(String message, String collection, CollectionStatePredicate predicate, int timeout, TimeUnit timeUnit) {
- log.info("waitForState ({}): {}", collection, message);
+ log.info("waitForState {}", collection);
AtomicReference<DocCollection> state = new AtomicReference<>();
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
try {
@@ -320,8 +321,11 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
liveNodesLastSeen.set(n);
return predicate.matches(n, c);
});
- } catch (Exception e) {
+ } catch (TimeoutException e) {
fail(message + "\n" + e.getMessage() + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray()) + "\nLast available state: " + state.get());
+ } catch (Exception e) {
+ log.error("Exception waiting for state", e);
+ fail(e.getMessage() + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray()) + "\nLast available state: " + state.get());
}
}