You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/12/11 04:12:21 UTC
[lucene-solr] branch jira/solr14003 updated: eliminated all uses of
Replica#getState()
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr14003
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr14003 by this push:
new 7b80f07 eliminated all uses of Replica#getState()
7b80f07 is described below
commit 7b80f071ed3fc784d59a993f3568b0ead0907fcb
Author: noble <no...@apache.org>
AuthorDate: Wed Dec 11 15:07:24 2019 +1100
eliminated all uses of Replica#getState()
---
.../stream/AnalyticsShardRequestManager.java | 4 +--
.../org/apache/solr/cloud/ElectionContext.java | 2 +-
.../apache/solr/cloud/ExclusiveSliceProperty.java | 6 +++--
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 5 ++--
.../java/org/apache/solr/cloud/ZkController.java | 9 ++++---
.../solr/cloud/api/collections/BackupCmd.java | 18 +++++++------
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../cloud/api/collections/CreateSnapshotCmd.java | 18 +++++++------
.../cloud/api/collections/DeleteCollectionCmd.java | 2 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 18 +++++++------
.../solr/cloud/api/collections/DeleteShardCmd.java | 18 ++++++-------
.../cloud/api/collections/DeleteSnapshotCmd.java | 16 ++++++-----
.../OverseerCollectionMessageHandler.java | 2 +-
.../solr/cloud/api/collections/RestoreCmd.java | 4 ++-
.../autoscaling/sim/SimClusterStateProvider.java | 5 ++--
.../apache/solr/cloud/overseer/ReplicaMutator.java | 8 +++---
.../java/org/apache/solr/core/BlobRepository.java | 4 ++-
.../java/org/apache/solr/core/CoreContainer.java | 11 +++++---
.../src/java/org/apache/solr/core/SolrCore.java | 10 +++++++
.../java/org/apache/solr/handler/IndexFetcher.java | 21 +++++++--------
.../org/apache/solr/handler/SolrConfigHandler.java | 4 ++-
.../org/apache/solr/handler/admin/ColStatus.java | 2 +-
.../solr/handler/admin/CollectionsHandler.java | 9 ++++---
.../apache/solr/handler/admin/PrepRecoveryOp.java | 2 +-
.../solr/handler/component/HttpShardHandler.java | 8 +++---
.../org/apache/solr/schema/ManagedIndexSchema.java | 7 +++--
.../solr/search/join/ScoreJoinQParserPlugin.java | 6 +++--
.../java/org/apache/solr/servlet/HttpSolrCall.java | 15 ++++++-----
.../processor/DistributedZkUpdateProcessor.java | 12 ++++++---
.../test/org/apache/solr/cloud/AddReplicaTest.java | 10 ++++---
.../cloud/AssignBackwardCompatibilityTest.java | 4 ++-
.../solr/cloud/ChaosMonkeyShardSplitTest.java | 4 ++-
.../solr/cloud/DeleteInactiveReplicaTest.java | 2 +-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 12 +++++----
.../org/apache/solr/cloud/ForceLeaderTest.java | 16 ++++++-----
.../solr/cloud/HttpPartitionOnCommitTest.java | 25 +++++++++++------
.../org/apache/solr/cloud/HttpPartitionTest.java | 16 ++++++-----
.../cloud/LeaderFailureAfterFreshStartTest.java | 4 ++-
.../apache/solr/cloud/LeaderTragicEventTest.java | 11 ++++----
.../solr/cloud/LegacyCloudClusterPropTest.java | 7 +++--
.../org/apache/solr/cloud/NodeMutatorTest.java | 17 +++++++-----
.../apache/solr/cloud/PeerSyncReplicationTest.java | 4 ++-
.../test/org/apache/solr/cloud/RecoveryZkTest.java | 11 +++++---
.../org/apache/solr/cloud/ReplaceNodeTest.java | 6 +++--
.../cloud/SharedFSAutoReplicaFailoverTest.java | 11 ++++----
.../test/org/apache/solr/cloud/SyncSliceTest.java | 19 +++++++------
.../apache/solr/cloud/TestCloudConsistency.java | 20 ++++++++------
.../solr/cloud/TestCloudSearcherWarming.java | 2 +-
.../org/apache/solr/cloud/TestPullReplica.java | 31 +++++++++++-----------
.../solr/cloud/TestPullReplicaErrorHandling.java | 8 +++---
.../solr/cloud/TestQueryingOnDownCollection.java | 4 ++-
.../org/apache/solr/cloud/TestTlogReplica.java | 18 ++++++++-----
.../api/collections/CollectionReloadTest.java | 9 +++++--
.../collections/CollectionTooManyReplicasTest.java | 2 +-
.../api/collections/CustomCollectionTest.java | 2 +-
.../solr/cloud/api/collections/ShardSplitTest.java | 2 +-
.../solr/cloud/hdfs/HDFSCollectionsAPITest.java | 2 +-
.../core/snapshots/TestSolrCloudSnapshots.java | 10 ++++---
.../solr/update/TestInPlaceUpdatesDistrib.java | 10 ++++---
.../solr/client/solrj/cloud/DirectShardState.java | 5 ++++
.../client/solrj/cloud/ShardStateProvider.java | 4 +++
.../solrj/cloud/autoscaling/PolicyHelper.java | 4 ++-
.../client/solrj/impl/BaseCloudSolrClient.java | 2 +-
.../solrj/io/stream/FeaturesSelectionStream.java | 6 ++---
.../client/solrj/io/stream/TextLogitStream.java | 5 +++-
.../solr/client/solrj/io/stream/TopicStream.java | 27 +++++++------------
.../solr/client/solrj/io/stream/TupleStream.java | 6 +++--
.../apache/solr/common/cloud/ClusterStateUtil.java | 8 +++---
.../common/cloud/CollectionStatePredicate.java | 4 +--
.../java/org/apache/solr/common/cloud/Replica.java | 1 +
.../apache/solr/common/cloud/ZkStateReader.java | 17 +++++++-----
.../solr/cloud/AbstractDistribZkTestBase.java | 27 +++++++++++--------
.../solr/cloud/AbstractFullDistribZkTestBase.java | 7 ++---
.../java/org/apache/solr/cloud/ChaosMonkey.java | 4 ++-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 2 +-
76 files changed, 404 insertions(+), 274 deletions(-)
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
index a8c50ef..04ab54c 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -33,8 +33,8 @@ import org.apache.solr.analytics.AnalyticsRequestManager;
import org.apache.solr.analytics.AnalyticsRequestParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -113,7 +113,7 @@ public class AnalyticsShardRequestManager {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+ if(zkStateReader.getShardStateProvider(collection).isActive(replica))
shuffler.add(replica);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 456daee..1585440 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -580,7 +580,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
ClusterState clusterState = zkStateReader.getClusterState();
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
if (rep == null) return;
- if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
+ if (zkStateReader.getShardStateProvider(collection).getState(rep) != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
index 23f684e..24e3cb2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
@@ -55,6 +55,7 @@ class ExclusiveSliceProperty {
private final String property;
private final DocCollection collection;
private final String collectionName;
+ private final ZkStateReader zkStateReader;
// Key structure. For each node, list all replicas on it regardless of whether they have the property or not.
private final Map<String, List<SliceReplica>> nodesHostingReplicas = new HashMap<>();
@@ -71,8 +72,9 @@ class ExclusiveSliceProperty {
private int assigned = 0;
- ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
+ ExclusiveSliceProperty(ZkStateReader reader, ClusterState clusterState, ZkNodeProps message) {
this.clusterState = clusterState;
+ this.zkStateReader = reader;
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
@@ -109,7 +111,7 @@ class ExclusiveSliceProperty {
}
private boolean isActive(Replica replica) {
- return replica.getState() == Replica.State.ACTIVE;
+ return zkStateReader.getShardStateProvider(collectionName).getState(replica) == Replica.State.ACTIVE;
}
// Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 5a83a6c..ee834f2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -385,7 +385,7 @@ public class Overseer implements SolrCloseable {
case DELETEREPLICAPROP:
return Collections.singletonList(new ReplicaMutator(getSolrCloudManager()).deleteReplicaProperty(clusterState, message));
case BALANCESHARDUNIQUE:
- ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
+ ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(reader, clusterState, message);
if (dProp.balanceProperty()) {
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection()));
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 39af7df..d68bc16 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -772,9 +773,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(docCollection.getName());
if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
- docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
- .getState() == Replica.State.ACTIVE) {
+ ssp.getState(docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()))== Replica.State.ACTIVE) {
// this operation may take a long time, by putting replica into DOWN state, client won't query this replica
zkController.publish(coreDesc, Replica.State.DOWN);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index fc9af6f..1419efe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -655,6 +655,7 @@ public class ZkController implements Closeable {
assert cd != null;
DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName());
if (dc == null) return;
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(cd.getCollectionName());
Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
if (shard == null) return;
@@ -663,7 +664,7 @@ public class ZkController implements Closeable {
if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != shard.getLeader()) return;
int numActiveReplicas = shard.getReplicas(
- rep -> rep.getState() == Replica.State.ACTIVE
+ rep -> ssp.getState(rep) == Replica.State.ACTIVE
&& rep.getType() != Type.PULL
&& getClusterState().getLiveNodes().contains(rep.getNodeName())
).size();
@@ -1038,7 +1039,7 @@ public class ZkController implements Closeable {
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
if (coreDescriptor.getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica)) {
Replica replica = collectionState.getReplica(coreDescriptor.getCloudDescriptor().getCoreNodeName());
- if (replica == null || replica.getState() != Replica.State.DOWN) {
+ if (replica == null || ssp.getState(replica) != Replica.State.DOWN) {
foundStates = false;
}
}
@@ -1174,7 +1175,7 @@ public class ZkController implements Closeable {
// check replica's existence in clusterstate first
try {
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
- TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+ TimeUnit.MILLISECONDS, (collectionState, ssp) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
}
@@ -1825,7 +1826,7 @@ public class ZkController implements Closeable {
AtomicReference<String> errorMessage = new AtomicReference<>();
AtomicReference<DocCollection> collectionState = new AtomicReference<>();
try {
- zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
+ zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c,ssp) -> {
collectionState.set(c);
if (c == null)
return false;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index ceec4e2..55da606 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -16,12 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.time.Instant;
@@ -31,6 +25,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.lucene.util.Version;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -56,6 +51,12 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -143,16 +144,17 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+ ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(slice.collection);
if (leaderCore.isPresent()) {
log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
Replica r = slice.getReplica(leaderCore.get().getCoreName());
- if ((r != null) && !r.getState().equals(State.DOWN)) {
+ if ((r != null) && ssp.getState(r) != State.DOWN) {
return r;
}
}
Optional<Replica> r = slice.getReplicas().stream()
- .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
+ .filter(it -> ssp.getState(it) != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), it))
.findFirst();
if (!r.isPresent()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index bc2a0b5..4cf2722 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -322,7 +322,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
CollectionAdminParams.COLOCATED_WITH, collectionName);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
try {
- zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
+ zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState,ssp) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
} catch (TimeoutException e) {
log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
// maybe the overseer queue is backed up, we don't want to fail the create request
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index abc3597..02e3652 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -16,12 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Date;
@@ -31,16 +25,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -53,6 +48,12 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
/**
* This class implements the functionality of creating a collection level snapshot.
*/
@@ -94,12 +95,13 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList shardRequestResults = new NamedList();
Map<String, Slice> shardByCoreName = new HashMap<>();
+ ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(collectionName);
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) {
- if (replica.getState() != State.ACTIVE) {
+ if (ssp.getState(replica) != State.ACTIVE) {
log.info("Replica {} is not active. Hence not sending the createsnapshot request", replica.getCoreName());
continue; // Since replica is not active - no point sending a request.
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 648f5ba..737ef9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -147,7 +147,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection
- zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
+ zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState,ssp) -> collectionState == null);
// we can delete any remaining unique aliases
if (!aliasReferences.isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 83f9141..4709db9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -16,13 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
@@ -51,6 +45,13 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
public class DeleteReplicaCmd implements Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -222,7 +223,8 @@ public class DeleteReplicaCmd implements Cmd {
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
// on the command.
- if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+ ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(collectionName);
+ if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && ssp.getState(replica) != Replica.State.DOWN) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName +
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 4aca282..e0e827b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -17,14 +17,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
@@ -51,6 +43,14 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
@@ -141,7 +141,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
- zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
+ zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c,ssp) -> c.getSlice(sliceId) == null);
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
} catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 634692d..9269c75 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -16,12 +16,6 @@
*/
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
@@ -29,6 +23,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -51,6 +46,12 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
/**
* This class implements the functionality of deleting a collection level snapshot.
*/
@@ -101,9 +102,10 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
+ ShardStateProvider ssp = ocmh.zkStateReader.getShardStateProvider(collectionName);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) {
- if (replica.getState() == State.DOWN) {
+ if (ssp.getState(replica) == State.DOWN) {
continue; // Since replica is down - no point sending a request.
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 54b7f5b..13bb2da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -415,7 +415,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
try {
- zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
+ zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c,ssp) -> {
if (c == null)
return true;
Slice slice = c.getSlice(shard);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index aa5cbe9..9e81941 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
@@ -114,6 +115,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
zkStateReader.getClusterState().getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM);
+ ShardStateProvider ssp = ocmh.cloudManager.getClusterStateProvider().getShardStateProvider(restoreCollectionName);
int numShards = backupCollectionState.getActiveSlices().size();
int numNrtReplicas;
@@ -339,7 +341,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
for (Replica r : s.getReplicas()) {
String nodeName = r.getNodeName();
String coreNodeName = r.getCoreName();
- Replica.State stateRep = r.getState();
+ Replica.State stateRep = ssp.getState(r);
log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}", nodeName, coreNodeName,
stateRep.name());
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 6d731a6..8d2ea0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -2216,6 +2216,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collectionsStatesRef.set(null);
ClusterState state = getClusterState();
state.forEachCollection(coll -> {
+ ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(coll.getName());
Map<String, Object> perColl = new LinkedHashMap<>();
stats.put(coll.getName(), perColl);
perColl.put("shardsTotal", coll.getSlices().size());
@@ -2258,11 +2259,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
for (Replica r : s.getReplicas()) {
- if (r.getState() == Replica.State.ACTIVE) {
+ if (ssp.getState(r) == Replica.State.ACTIVE) {
activeReplicas++;
}
}
- Replica leader = s.getLeader();
+ Replica leader = ssp.getLeader(s);
if (leader == null) {
noLeader++;
if (!s.getReplicas().isEmpty()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index a70687a..9811cb34 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.CloudUtil;
@@ -380,16 +381,17 @@ public class ReplicaMutator {
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
Slice slice = collection.getSlice(sliceName);
Map<String, Object> sliceProps = slice.getProperties();
+ ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(collection.getName());
if (slice.getState() == Slice.State.RECOVERY) {
log.info("Shard: {} is in recovery state", sliceName);
// is this replica active?
- if (replica.getState() == Replica.State.ACTIVE) {
+ if (ssp.getState(replica) == Replica.State.ACTIVE) {
log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
// are all other replicas also active?
boolean allActive = true;
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
if (coreNodeName.equals(entry.getKey())) continue;
- if (entry.getValue().getState() != Replica.State.ACTIVE) {
+ if (ssp.getState(entry.getValue()) != Replica.State.ACTIVE) {
allActive = false;
break;
}
@@ -409,7 +411,7 @@ public class ReplicaMutator {
log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
// this is a fellow sub shard so check if all replicas are active
for (Map.Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
- if (sliceEntry.getValue().getState() != Replica.State.ACTIVE) {
+ if (ssp.getState(sliceEntry.getValue()) != Replica.State.ACTIVE) {
allActive = false;
break outer;
}
diff --git a/solr/core/src/java/org/apache/solr/core/BlobRepository.java b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
index 59bd795..7d7e236 100644
--- a/solr/core/src/java/org/apache/solr/core/BlobRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
@@ -38,6 +38,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -238,6 +239,7 @@ public class BlobRepository {
DocCollection coll = cs.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
if (coll == null)
throw new SolrException(SERVICE_UNAVAILABLE, CollectionAdminParams.SYSTEM_COLL + " collection not available");
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(CollectionAdminParams.SYSTEM_COLL);
ArrayList<Slice> slices = new ArrayList<>(coll.getActiveSlices());
if (slices.isEmpty())
throw new SolrException(SERVICE_UNAVAILABLE, "No active slices for " + CollectionAdminParams.SYSTEM_COLL + " collection");
@@ -248,7 +250,7 @@ public class BlobRepository {
List<Replica> replicas = new ArrayList<>(slice.getReplicasMap().values());
Collections.shuffle(replicas, RANDOM);
for (Replica r : replicas) {
- if (r.getState() == Replica.State.ACTIVE) {
+ if (ssp.getState(r) == Replica.State.ACTIVE) {
if (zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))) {
replica = r;
break;
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a9db133..fc9fd17 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -49,6 +49,7 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -69,6 +70,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
@@ -1342,12 +1344,13 @@ public class CoreContainer {
case fromleader: // Recovery from leader on a CorruptedIndexException
if (isZooKeeperAware()) {
CloudDescriptor desc = dcore.getCloudDescriptor();
+ ShardStateProvider ssp = getZkController().getZkStateReader().getShardStateProvider(desc.getCollectionName());
try {
- Replica leader = getZkController().getClusterState()
+ Slice slice = getZkController().getClusterState()
.getCollection(desc.getCollectionName())
- .getSlice(desc.getShardId())
- .getLeader();
- if (leader != null && leader.getState() == State.ACTIVE) {
+ .getSlice(desc.getShardId());
+ Replica leader = ssp.getLeader(slice);
+ if (leader != null && ssp.getState(leader) == State.ACTIVE) {
log.info("Found active leader, will attempt to create fresh core and recover.");
resetIndexDirectory(dcore, coreConfig);
// the index of this core is emptied, its term should be set to 0
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 2c6b175..5210f7c 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -74,6 +74,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.RecoveryStrategy;
@@ -238,6 +239,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
public volatile boolean searchEnabled = true;
public volatile boolean indexEnabled = true;
public volatile boolean readOnly = false;
+ private ShardStateProvider shardStateProvider;
private PackageListeners packageListeners = new PackageListeners(this);
@@ -3200,6 +3202,14 @@ public final class SolrCore implements SolrInfoBean, Closeable {
});
return blobRef;
}
+ public ShardStateProvider getShardStateProvider(){
+ if(this.shardStateProvider == null) {
+ if(coreContainer.getZkController() != null){
+ this.shardStateProvider = coreContainer.getZkController().getZkStateReader().getShardStateProvider(this.getCoreDescriptor().getCloudDescriptor().getCollectionName());
+ }
+ }
+ return shardStateProvider;
+ }
/**
* Run an arbitrary task in it's own thread. This is an expert option and is
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 0f873af..75ff267 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -70,15 +70,16 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@@ -384,13 +385,17 @@ public class IndexFetcher {
try {
if (fetchFromLeader) {
assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
- Replica replica = getLeaderReplica();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+ ZkStateReader zkStateReader = solrCore.getCoreContainer().getZkController().getZkStateReader();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(cd.getCollectionName());
+ Replica replica = ssp
+ .getLeader(zkStateReader.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()));
+
if (cd.getCoreNodeName().equals(replica.getName())) {
return IndexFetchResult.EXPECTING_NON_LEADER;
}
- if (replica.getState() != Replica.State.ACTIVE) {
- log.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
+ if (ssp.getState(replica) != Replica.State.ACTIVE) {
+ log.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), ssp.getState(replica));
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (!solrCore.getCoreContainer().getZkController().getClusterState().liveNodesContain(replica.getNodeName())) {
@@ -692,14 +697,6 @@ public class IndexFetcher {
}
}
- private Replica getLeaderReplica() throws InterruptedException {
- ZkController zkController = solrCore.getCoreContainer().getZkController();
- CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- cd.getCollectionName(), cd.getShardId());
- return leaderReplica;
- }
-
private void cleanup(final SolrCore core, Directory tmpIndexDir,
Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
index 004da31..b4b9df0 100644
--- a/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
@@ -44,6 +44,7 @@ import org.apache.solr.api.ApiBag;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
@@ -823,6 +824,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
+ ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
if (docCollection != null && docCollection.getActiveSlices() != null && docCollection.getActiveSlices().size() > 0) {
final Collection<Slice> activeSlices = docCollection.getActiveSlices();
for (Slice next : activeSlices) {
@@ -830,7 +832,7 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
if (replicasMap != null) {
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
Replica replica = entry.getValue();
- if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ if (ssp.isActive(replica)) {
activeReplicaCoreUrls.add(replica.getCoreUrl());
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 434905b..e22337d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -129,7 +129,7 @@ public class ColStatus {
downReplicas++;
continue;
}
- switch (r.getState()) {
+ switch (ssp.getState(r)) {
case ACTIVE:
activeReplicas++;
break;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 09fc02c..6487789 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.api.Api;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -1340,11 +1341,12 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
throw new SolrException(ErrorCode.BAD_REQUEST,
"No shard with name " + sliceId + " exists for collection " + collectionName);
}
+ ShardStateProvider ssp = handler.coreContainer.getZkController().getZkStateReader().getShardStateProvider(collectionName);
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
// if an active replica is the leader, then all is fine already
- Replica leader = slice.getLeader();
- if (leader != null && leader.getState() == State.ACTIVE) {
+ Replica leader = ssp.getLeader(slice);
+ if (leader != null && ssp.getState(leader) == State.ACTIVE) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"The shard already has an active leader. Force leader is not applicable. State: " + slice);
}
@@ -1369,7 +1371,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
clusterState = handler.coreContainer.getZkController().getClusterState();
collection = clusterState.getCollection(collectionName);
slice = collection.getSlice(sliceId);
- if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
+ Replica ldr = ssp.getLeader(slice);
+ if (ldr != null && ssp.getState(ldr) == State.ACTIVE) {
success = true;
break;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index a000477..4e58b20 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -96,7 +96,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
if (slice != null) {
final Replica replica = slice.getReplicasMap().get(coreNodeName);
if (replica != null) {
- state = replica.getState();
+ state = ssp.getState(replica);
live = n.contains(nodeName);
final Replica.State localState = cloudDescriptor.getLastPublished();
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 69da394..6e4949c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -43,8 +43,8 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.LBSolrClient;
-import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
@@ -473,7 +473,7 @@ public class HttpShardHandler extends ShardHandler {
}
};
- final List<Replica> eligibleSliceReplicas = collectEligibleReplicas(slice, clusterState, onlyNrtReplicas, isShardLeader);
+ final List<Replica> eligibleSliceReplicas = collectEligibleReplicas(rb, slice, clusterState, onlyNrtReplicas, isShardLeader);
final List<String> shardUrls = transformReplicasToShardUrls(replicaListTransformer, eligibleSliceReplicas);
@@ -514,12 +514,12 @@ public class HttpShardHandler extends ShardHandler {
}
}
- private static List<Replica> collectEligibleReplicas(Slice slice, ClusterState clusterState, boolean onlyNrtReplicas, Predicate<Replica> isShardLeader) {
+ private static List<Replica> collectEligibleReplicas(ResponseBuilder rb, Slice slice, ClusterState clusterState, boolean onlyNrtReplicas, Predicate<Replica> isShardLeader) {
final Collection<Replica> allSliceReplicas = slice.getReplicasMap().values();
final List<Replica> eligibleSliceReplicas = new ArrayList<>(allSliceReplicas.size());
for (Replica replica : allSliceReplicas) {
if (!clusterState.liveNodesContain(replica.getNodeName())
- || replica.getState() != Replica.State.ACTIVE
+ || rb.req.getCore().getShardStateProvider().getState(replica) != Replica.State.ACTIVE
|| (onlyNrtReplicas && replica.getType() == Replica.Type.PULL)) {
continue;
}
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index 57b0c90..7165089 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -49,6 +48,7 @@ import org.apache.solr.analysis.TokenizerChain;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
@@ -287,7 +287,7 @@ public final class ManagedIndexSchema extends IndexSchema {
List<String> activeReplicaCoreUrls = new ArrayList<>();
ZkStateReader zkStateReader = zkController.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
+ ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
final Slice[] activeSlices = docCollection.getActiveSlicesArr();
@@ -297,8 +297,7 @@ public final class ManagedIndexSchema extends IndexSchema {
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
Replica replica = entry.getValue();
if (!localCoreNodeName.equals(replica.getName()) &&
- replica.getState() == Replica.State.ACTIVE &&
- liveNodes.contains(replica.getNodeName())) {
+ ssp.isActive(replica)) {
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
activeReplicaCoreUrls.add(replicaCoreProps.getCoreUrl());
}
diff --git a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
index 7bd78c0..3d689bd 100644
--- a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
+++ b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
@@ -26,6 +26,7 @@ import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.JoinUtil;
import org.apache.lucene.search.join.ScoreMode;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
@@ -306,6 +307,7 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
String fromReplica = null;
String nodeName = zkController.getNodeName();
+ ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(fromIndex);
for (Slice slice : zkController.getClusterState().getCollection(fromIndex).getActiveSlicesArr()) {
if (fromReplica != null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
@@ -315,10 +317,10 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
if (replica.getNodeName().equals(nodeName)) {
fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
// found local replica, but is it Active?
- if (replica.getState() != Replica.State.ACTIVE)
+ if (ssp.getState(replica) != Replica.State.ACTIVE)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
- ") on "+nodeName+", but it is "+replica.getState());
+ ") on "+nodeName+", but it is "+ssp.getState(replica));
break;
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 9c1d357..3ffef05 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -56,6 +56,7 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.InputStreamEntity;
import org.apache.solr.api.ApiBag;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.SolrException;
@@ -927,24 +928,24 @@ public class HttpSolrCall {
return null;
}
- Set<String> liveNodes = clusterState.getLiveNodes();
+ ShardStateProvider ssp = cores.getZkController().getZkStateReader().getShardStateProvider(collectionName);
if (isPreferLeader) {
List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
- SolrCore core = randomlyGetSolrCore(liveNodes, leaderReplicas);
+ SolrCore core = randomlyGetSolrCore(ssp, leaderReplicas);
if (core != null) return core;
}
List<Replica> replicas = collection.getReplicas(cores.getZkController().getNodeName());
- return randomlyGetSolrCore(liveNodes, replicas);
+ return randomlyGetSolrCore(ssp, replicas);
}
- private SolrCore randomlyGetSolrCore(Set<String> liveNodes, List<Replica> replicas) {
+ private SolrCore randomlyGetSolrCore(ShardStateProvider ssp, List<Replica> replicas) {
if (replicas != null) {
RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
while (it.hasNext()) {
Replica replica = it.next();
- if (liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
+ if (ssp.isActive(replica)) {
SolrCore core = checkProps(replica);
if (core != null) return core;
}
@@ -1040,14 +1041,14 @@ public class HttpSolrCall {
String coreUrl;
Set<String> liveNodes = clusterState.getLiveNodes();
Collections.shuffle(slices, random);
+ ShardStateProvider ssp = this.cores.getZkController().getZkStateReader().getShardStateProvider(collectionName);
for (Slice slice : slices) {
List<Replica> randomizedReplicas = new ArrayList<>(slice.getReplicas());
Collections.shuffle(randomizedReplicas, random);
for (Replica replica : randomizedReplicas) {
- if (!activeReplicas || (liveNodes.contains(replica.getNodeName())
- && replica.getState() == Replica.State.ACTIVE)) {
+ if (!activeReplicas || ssp.isActive(replica)) {
if (byCoreName && !origCorename.equals(replica.getStr(CORE_NAME_PROP))) {
// if it's by core name, make sure they match
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 569f877..58c227b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
@@ -71,6 +72,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.GET_LEADER_RETRY_DEFAULT_TIMEOUT;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@@ -641,14 +643,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
try {
// Not equivalent to getLeaderProps, which retries to find a leader.
// Replica leader = slice.getLeader();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(collection);
+ Replica leaderReplica = ssp.getLeader(slice, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
shardId = cloudDesc.getShardId();
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ leaderReplica = ssp.getLeader(slice, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
}
}
@@ -694,7 +697,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
} else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
- } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
+ } else if ( !clusterState.getLiveNodes().contains(replica.getNodeName()) || ssp.getState(replica) == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
@@ -818,6 +821,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
+ ShardStateProvider ssp = zkController.getZkStateReader().getShardStateProvider(leaderReplica.collection);
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
@@ -847,7 +851,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
- || replica.getState() == Replica.State.DOWN) {
+ || ssp.getState(replica) == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId));
diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
index 3bfda38..fe60c6f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
@@ -16,14 +16,12 @@
*/
package org.apache.solr.cloud;
-import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
-import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
-
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashSet;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
@@ -36,6 +34,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.client.solrj.response.RequestStatusState.COMPLETED;
+import static org.apache.solr.client.solrj.response.RequestStatusState.FAILED;
+
/**
*
*/
@@ -137,6 +138,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection coll = clusterState.getCollection(collection);
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(collection);
String sliceName = coll.getSlices().iterator().next().getName();
Collection<Replica> replicas = coll.getSlice(sliceName).getReplicas();
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName);
@@ -192,7 +194,7 @@ public class AddReplicaTest extends SolrCloudTestCase {
if (replica.getName().equals(replica2)) {
continue; // may be still recovering
}
- assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
+ assertSame(coll.toString() + "\n" + replica.toString(),ssp.getState(replica), Replica.State.ACTIVE);
}
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
index c1ba972..884fe2f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.cloud.DocCollection;
@@ -63,6 +64,7 @@ public class AssignBackwardCompatibilityTest extends SolrCloudTestCase {
int numOperations = random().nextInt(15) + 15;
int numLiveReplicas = 4;
+ ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(COLLECTION);
boolean clearedCounter = false;
for (int i = 0; i < numOperations; i++) {
@@ -78,7 +80,7 @@ public class AssignBackwardCompatibilityTest extends SolrCloudTestCase {
if (deleteReplica) {
cluster.waitForActiveCollection(COLLECTION, 1, numLiveReplicas);
DocCollection dc = getCollectionState(COLLECTION);
- Replica replica = getRandomReplica(dc.getSlice("shard1"), (r) -> r.getState() == Replica.State.ACTIVE);
+ Replica replica = getRandomReplica(dc.getSlice("shard1"), (r) -> ssp.getState(r)== Replica.State.ACTIVE);
CollectionAdminRequest.deleteReplica(COLLECTION, "shard1", replica.getName()).process(cluster.getSolrClient());
coreNames.remove(replica.getCoreName());
numLiveReplicas--;
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 50e2443..2d2b292 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.api.collections.ShardSplitTest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
@@ -224,9 +225,10 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
DocCollection collection1 = clusterState.getCollection("collection1");
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
boolean allActive = true;
for (Replica replica : replicas) {
- if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+ if (!ssp.isActive(replica)) {
allActive = false;
break;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
index ed01b6f..b6bec01 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
@@ -78,7 +78,7 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c, ssp) -> {
Replica r = c.getReplica(replica.getCoreName());
- return r == null || r.getState() != Replica.State.ACTIVE;
+ return r == null || ssp.getState(r) != Replica.State.ACTIVE;
});
log.info("Removing replica {}/{} ", shard.getName(), replica.getName());
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index c284036..e188ccb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@@ -99,11 +100,12 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
cluster.waitForActiveCollection(collectionName, 2, 4);
DocCollection state = getCollectionState(collectionName);
+ ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
Slice shard = getRandomShard(state);
// don't choose the leader to shutdown, it just complicates things unneccessarily
Replica replica = getRandomReplica(shard, (r) ->
- ( r.getState() == Replica.State.ACTIVE &&
+ ( shardStateProvider.getState(r) == Replica.State.ACTIVE &&
! r.equals(shard.getLeader())));
CoreStatus coreStatus = getCoreStatus(replica);
@@ -239,7 +241,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
// don't choose the leader to shutdown, it just complicates things unneccessarily
Replica replica = getRandomReplica(shard, (r) ->
- ( r.getState() == Replica.State.ACTIVE &&
+ ( cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName).getState(r) == Replica.State.ACTIVE &&
! r.equals(shard.getLeader())));
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
@@ -376,7 +378,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
replica1Jetty.stop();
waitForNodeLeave(replica1JettyNodeName);
waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState, ssp)
- -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+ -> ssp.getState(collectionState.getSlice("shard1").getReplica(replica1.getName())) == DOWN);
replica1Jetty.start();
waitingForReplicaGetDeleted.acquire();
} finally {
@@ -405,8 +407,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
waitForState("Expected new active leader", collectionName, (liveNodes, collectionState, ssp) -> {
Slice shard = collectionState.getSlice("shard1");
- Replica newLeader = shard.getLeader();
- return newLeader != null && newLeader.getState() == Replica.State.ACTIVE && !newLeader.getName().equals(latestLeader.getName());
+ Replica newLeader = ssp.getLeader(shard);
+ return newLeader != null && ssp.getState(newLeader) == Replica.State.ACTIVE && !newLeader.getName().equals(latestLeader.getName());
});
leaderJetty.start();
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 7509e62..5950758 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -98,7 +99,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
waitForState(testCollectionName, leader.getName(), State.DOWN, 60000);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
- int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ int numActiveReplicas = getNumberOfActiveReplicas(cloudClient.getZkStateReader().getShardStateProvider(testCollectionName), clusterState, testCollectionName, SHARD1);
assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
"; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
@@ -129,9 +130,10 @@ public class ForceLeaderTest extends HttpPartitionTest {
Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
assertNotNull(newLeader);
// leader is active
- assertEquals(State.ACTIVE, newLeader.getState());
+ ShardStateProvider shardStateProvider = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
+ assertEquals(State.ACTIVE, shardStateProvider.getState(newLeader));
- numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ numActiveReplicas = getNumberOfActiveReplicas(shardStateProvider, clusterState, testCollectionName, SHARD1);
assertEquals(2, numActiveReplicas);
// Assert that indexing works again
@@ -204,7 +206,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
boolean allDown = true;
for (Replica replica : clusterState.getCollection(collectionName).getSlice(shard).getReplicas()) {
- if (replica.getState() != State.DOWN) {
+ if (zkController.getZkStateReader().getShardStateProvider(collectionName).getState(replica) != State.DOWN) {
allDown = false;
}
}
@@ -244,7 +246,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
cloudClient.getZkStateReader().forceUpdateCollection(collection);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
log.info("After bringing back leader: " + clusterState.getCollection(collection).getSlice(SHARD1));
- int numActiveReplicas = getNumberOfActiveReplicas(clusterState, collection, SHARD1);
+ int numActiveReplicas = getNumberOfActiveReplicas(cloudClient.getZkStateReader().getShardStateProvider(collection), clusterState, collection, SHARD1);
assertEquals(1+notLeaders.size(), numActiveReplicas);
log.info("Sending doc "+docid+"...");
sendDoc(docid);
@@ -271,11 +273,11 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
- private int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) {
+ private int getNumberOfActiveReplicas(ShardStateProvider ssp, ClusterState clusterState, String collection, String sliceId) {
int numActiveReplicas = 0;
// Assert all replicas are active
for (Replica rep : clusterState.getCollection(collection).getSlice(sliceId).getReplicas()) {
- if (rep.getState().equals(State.ACTIVE)) {
+ if (ssp.getState(rep)== State.ACTIVE) {
numActiveReplicas++;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
index 8df6175..90bc857 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
@@ -16,22 +16,25 @@
*/
package org.apache.solr.cloud;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
import org.apache.http.NoHttpResponseException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.util.RTimer;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-
public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -75,6 +78,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
String testCollectionName = "c8n_2x2_commits";
createCollection(testCollectionName, "conf1", 2, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
+ DocCollection coll = cloudClient.getClusterStateProvider().getCollection(testCollectionName);
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
@@ -85,6 +89,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
log.info("All replicas active for "+testCollectionName);
+ ShardStateProvider shardStateProvider = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
// let's put the leader in its own partition, no replicas can contact it now
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
log.info("Creating partition to leader at "+leader.getCoreUrl());
@@ -98,8 +103,9 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
Thread.sleep(sleepMsBeforeHealPartition);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
- leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+ leader = shardStateProvider.getLeader(coll.getSlice("shard1"), ZkStateReader.GET_LEADER_RETRY_DEFAULT_TIMEOUT) ;
+ assertSame("Leader was not active", Replica.State.ACTIVE, shardStateProvider.getState(leader));
log.info("Healing partitioned replica at "+leader.getCoreUrl());
leaderProxy.reopen();
@@ -118,6 +124,9 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
String testCollectionName = "c8n_1x3_commits";
createCollection(testCollectionName, "conf1", 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
+ DocCollection coll = cloudClient.getZkStateReader().getCollection(testCollectionName);
+
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
@@ -129,7 +138,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
log.info("All replicas active for "+testCollectionName);
// let's put the leader in its own partition, no replicas can contact it now
- Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ Replica leader = ssp.getLeader(coll.getSlice("shard1"));
log.info("Creating partition to leader at "+leader.getCoreUrl());
SocketProxy leaderProxy = getProxyForReplica(leader);
@@ -141,7 +150,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+ assertSame("Leader was not active", Replica.State.ACTIVE, ssp.getState(leader));
log.info("Healing partitioned replica at "+leader.getCoreUrl());
leaderProxy.reopen();
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 7a11262..3807c20 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.solr.cloud;
-import static org.apache.solr.common.cloud.Replica.State.DOWN;
-import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
-
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -32,11 +29,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -65,6 +64,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
+import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
+
/**
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
@@ -320,7 +322,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
Slice slice = slices.iterator().next();
Replica partitionedReplica = slice.getReplica(replicaName);
- replicaState = partitionedReplica.getState();
+ replicaState = cloudClient.getZkStateReader().getShardStateProvider(collection).getState(partitionedReplica);
if (replicaState == state) return;
}
assertEquals("Timeout waiting for state "+ state +" of replica " + replicaName + ", current state " + replicaState,
@@ -469,10 +471,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
ZkStateReader zkr = cloudClient.getZkStateReader();
ClusterState cs = zkr.getClusterState();
assertNotNull(cs);
+ ShardStateProvider ssp = zkr.getShardStateProvider(testCollectionName);
for (Slice shard : cs.getCollection(testCollectionName).getActiveSlices()) {
if (shard.getName().equals(shardId)) {
for (Replica replica : shard.getReplicas()) {
- final Replica.State state = replica.getState();
+ final Replica.State state = ssp.getState(replica);
if (state == Replica.State.ACTIVE || state == Replica.State.RECOVERING) {
activeReplicas.put(replica.getName(), replica);
}
@@ -593,6 +596,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
boolean allReplicasUp = false;
long waitMs = 0L;
long maxWaitMs = maxWaitSecs * 1000L;
+ ShardStateProvider ssp = zkr.getShardStateProvider(testCollectionName);
while (waitMs < maxWaitMs && !allReplicasUp) {
cs = cloudClient.getZkStateReader().getClusterState();
assertNotNull(cs);
@@ -607,7 +611,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
if (!replicasToCheck.contains(replica.getName()))
continue;
- final Replica.State state = replica.getState();
+ final Replica.State state = ssp.getState(replica);
if (state != Replica.State.ACTIVE) {
log.info("Replica " + replica.getName() + " is currently " + state);
allReplicasUp = false;
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
index 353c0c9..d4c78ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
@@ -32,6 +32,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
import org.apache.solr.common.SolrInputDocument;
@@ -207,6 +208,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
@@ -221,7 +223,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
.collect(Collectors.toList());
for (Replica replica : replicasToCheck) {
- if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+ if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica) != Replica.State.ACTIVE) {
allActive = false;
break;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
index 69c14cb..8004d7b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
@@ -97,7 +98,7 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), collection, 120000);
Slice shard = getCollectionState(collection).getSlice("shard1");
assertNotSame(shard.getLeader().getNodeName(), oldLeader.getNodeName());
- assertEquals(getNonLeader(shard).getNodeName(), oldLeader.getNodeName());
+ assertEquals(getNonLeader(shard, cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection) ).getNodeName(), oldLeader.getNodeName());
for (String id : addedIds) {
assertNotNull(cluster.getSolrClient().getById(collection,id));
@@ -157,9 +158,9 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
return oldLeader;
}
- private Replica getNonLeader(Slice slice) {
+ private Replica getNonLeader(Slice slice, ShardStateProvider ssp) {
if (slice.getReplicas().size() <= 1) return null;
- return slice.getReplicas(rep -> !rep.getName().equals(slice.getLeader().getName())).get(0);
+ return slice.getReplicas(rep -> !rep.getName().equals(ssp.getLeader(slice).getName())).get(0);
}
@Test
@@ -177,11 +178,11 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
JettySolrRunner otherReplicaJetty = null;
if (numReplicas == 2) {
Slice shard = getCollectionState(collection).getSlice("shard1");
- otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard));
+ otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard, cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection)));
log.info("Stop jetty node : {} state:{}", otherReplicaJetty.getBaseUrl(), getCollectionState(collection));
otherReplicaJetty.stop();
cluster.waitForJettyToStop(otherReplicaJetty);
- waitForState("Timeout waiting for replica get down", collection, (liveNodes, collectionState, ssp) -> getNonLeader(collectionState.getSlice("shard1")).getState() != Replica.State.ACTIVE);
+ waitForState("Timeout waiting for replica get down", collection, (liveNodes, collectionState, ssp) -> ssp.getState(getNonLeader(collectionState.getSlice("shard1"), ssp)) != Replica.State.ACTIVE);
}
Replica oldLeader = corruptLeader(collection, new ArrayList<>());
diff --git a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
index a91d5f7..37f93d0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.ClusterProperties;
@@ -124,10 +125,12 @@ public class LegacyCloudClusterPropTest extends SolrCloudTestCase {
}
private void checkCollectionActive(String coll) {
- assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
DocCollection docColl = getCollectionState(coll);
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(coll);
for (Replica rep : docColl.getReplicas()) {
- if (rep.getState() == Replica.State.ACTIVE) return;
+ if (ssp.getState(rep) == Replica.State.ACTIVE) return;
}
fail("Replica was not active for collection " + coll);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index a446f29..4412cb5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.solr.SolrTestCaseJ4Test;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.cloud.overseer.NodeMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.cloud.ClusterState;
@@ -48,6 +49,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
//Collection2: 1 shard X 1 replica = replica1 on node2
ZkStateReader reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2", 1, 1, NODE1, NODE2);
ClusterState clusterState = reader.getClusterState();
+ ShardStateProvider ssp = reader.getShardStateProvider("collection1");
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
@@ -56,8 +58,8 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
List<ZkWriteCommand> writes = nm.downNode(clusterState, props);
assertEquals(writes.size(), 1);
assertEquals(writes.get(0).name, "collection1");
- assertEquals(writes.get(0).collection.getReplica("replica1").getState(), Replica.State.DOWN);
- assertEquals(writes.get(0).collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
+ assertEquals(ssp.getState(writes.get(0).collection.getReplica("replica1")), Replica.State.DOWN);
+ assertEquals(ssp.getState(writes.get(0).collection.getReplica("replica2")), Replica.State.ACTIVE);
reader.close();
//We use 3 nodes with maxShardsPerNode as 1
@@ -68,6 +70,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
clusterState = reader.getClusterState();
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
+ ssp = reader.getShardStateProvider("collection2");
assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
@@ -79,12 +82,12 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
assertEquals(writes.size(), 2);
for (ZkWriteCommand write : writes) {
if (write.name.equals("collection1")) {
- assertEquals(write.collection.getReplica("replica1").getState(), Replica.State.DOWN);
- assertEquals(write.collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
+ assertEquals(ssp.getState(write.collection.getReplica("replica1")), Replica.State.DOWN);
+ assertEquals(ssp.getState(write.collection.getReplica("replica2")), Replica.State.ACTIVE);
} else if (write.name.equals("collection3")) {
- assertEquals(write.collection.getReplica("replica5").getState(), Replica.State.DOWN);
- assertEquals(write.collection.getReplica("replica6").getState(), Replica.State.ACTIVE);
- assertEquals(write.collection.getReplica("replica7").getState(), Replica.State.ACTIVE);
+ assertEquals(ssp.getState(write.collection.getReplica("replica5")), Replica.State.DOWN);
+ assertEquals(ssp.getState(write.collection.getReplica("replica6")), Replica.State.ACTIVE);
+ assertEquals(ssp.getState(write.collection.getReplica("replica7")), Replica.State.ACTIVE);
} else {
fail("No other collection needs to be changed");
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
index 5169d22..eea5cdf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
import org.apache.solr.common.SolrInputDocument;
@@ -339,6 +340,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider("collection1");
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
@@ -354,7 +356,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
.collect(Collectors.toList());
for (Replica replica : replicasToCheck) {
- if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+ if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica) != Replica.State.ACTIVE) {
allActive = false;
break;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
index 5693330..a8ec1c9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -102,7 +103,8 @@ public class RecoveryZkTest extends SolrCloudTestCase {
// bring shard replica down
DocCollection state = getCollectionState(collection);
- Replica leader = state.getLeader("shard1");
+ ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
+ Replica leader = ssp.getLeader(state.getSlice("shard1"));
Replica replica = getRandomReplica(state.getSlice("shard1"), (r) -> leader != r);
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
@@ -131,12 +133,13 @@ public class RecoveryZkTest extends SolrCloudTestCase {
// test that leader and replica have same doc count
state = getCollectionState(collection);
- assertShardConsistency(state.getSlice("shard1"), true);
+
+ assertShardConsistency(ssp, state.getSlice("shard1"), true);
}
- private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
- List<Replica> replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE);
+ private void assertShardConsistency(ShardStateProvider ssp, Slice shard, boolean expectDocs) throws Exception {
+ List<Replica> replicas = shard.getReplicas(r -> ssp.getState(r)== Replica.State.ACTIVE);
long[] numCounts = new long[replicas.size()];
int i = 0;
for (Replica replica : replicas) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 0412330..f46d608 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -89,6 +90,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
cluster.waitForActiveCollection(coll, 5, 5 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(coll);
log.debug("### Before decommission: " + collection);
log.info("excluded_node : {} ", emptyNode);
createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient);
@@ -159,13 +161,13 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
});
assertFalse(newReplicas.isEmpty());
for (Replica r : newReplicas) {
- assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
+ assertEquals(r.toString(), Replica.State.ACTIVE, ssp.getState(r));
}
// make sure all replicas on emptyNode are not active
replicas = collection.getReplicas(emptyNode);
if (replicas != null) {
for (Replica r : replicas) {
- assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
+ assertFalse(r.toString(), Replica.State.ACTIVE.equals(ssp.getState(r)));
}
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
index 735cc20..9090392 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
@@ -32,13 +32,15 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.carrotsearch.randomizedtesting.annotations.Nightly;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
-import com.carrotsearch.randomizedtesting.annotations.Nightly;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
@@ -58,16 +60,14 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -398,12 +398,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
while (!timeOut.hasTimedOut()) {
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Collection<Slice> slices = clusterState.getCollection(collection).getActiveSlices();
+ ShardStateProvider ssp = cloudClient.getClusterStateProvider().getShardStateProvider(collection);
if (slices.size() == numSlices) {
boolean isMatch = true;
for (Slice slice : slices) {
int count = 0;
for (Replica replica : slice.getReplicas()) {
- if (replica.getState() == Replica.State.ACTIVE && clusterState.liveNodesContain(replica.getNodeName())) {
+ if (ssp.isActive(replica)) {
count++;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
index c48f22e..7f78516 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
@@ -16,10 +16,18 @@
*/
package org.apache.solr.cloud;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -33,13 +41,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
* elected.
@@ -217,11 +218,13 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection1.getName());
+
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
for (Replica replica : replicas) {
- if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+ if (!clusterState.liveNodesContain(replica.getNodeName()) || ssp.getState(replica)!= Replica.State.ACTIVE) {
allActive = false;
break;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
index 2f68ad9..56a071d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -155,16 +156,16 @@ public class TestCloudConsistency extends SolrCloudTestCase {
j2.stop();
cluster.waitForJettyToStop(j1);
cluster.waitForJettyToStop(j2);
-
+
waitForState("", collection, (liveNodes, collectionState, ssp) ->
collectionState.getSlice("shard1").getReplicas().stream()
- .filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
+ .filter(replica -> ssp.getState(replica) == Replica.State.DOWN).count() == 2);
addDocs(collection, 1, docId);
JettySolrRunner j3 = cluster.getJettySolrRunner(0);
j3.stop();
cluster.waitForJettyToStop(j3);
- waitForState("", collection, (liveNodes, collectionState, ssp) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+ waitForState("", collection, (liveNodes, collectionState, ssp) -> ssp.getState(collectionState.getReplica(leader.getName())) == Replica.State.DOWN);
cluster.getJettySolrRunner(1).start();
cluster.getJettySolrRunner(2).start();
@@ -173,9 +174,11 @@ public class TestCloudConsistency extends SolrCloudTestCase {
cluster.waitForNode(j2, 30);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
+
while (!timeOut.hasTimedOut()) {
- Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader();
- if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+ Replica newLeader = shardStateProvider.getLeader(getCollectionState(collection).getSlice("shard1"));
+ if (newLeader != null && !newLeader.getName().equals(leader.getName()) && shardStateProvider.getState(newLeader) == Replica.State.ACTIVE) {
fail("Out of sync replica became leader " + newLeader);
}
}
@@ -212,12 +215,13 @@ public class TestCloudConsistency extends SolrCloudTestCase {
proxies.get(cluster.getJettySolrRunner(i)).reopen();
}
waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState, ssp)
- -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+ -> ssp.getState(collectionState.getReplica(leader.getName())) == Replica.State.DOWN);
+ ShardStateProvider shardStateProvider = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collection);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
- Replica newLeader = getCollectionState(collection).getLeader("shard1");
- if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+ Replica newLeader = shardStateProvider.getLeader( getCollectionState(collection).getSlice("shard1"));
+ if (newLeader != null && !newLeader.getName().equals(leader.getName()) && shardStateProvider.getState(newLeader) == Replica.State.ACTIVE) {
fail("Out of sync replica became leader " + newLeader);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
index 5e5a11e..b25ccb3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudSearcherWarming.java
@@ -197,7 +197,7 @@ public class TestCloudSearcherWarming extends SolrCloudTestCase {
CollectionStatePredicate collectionStatePredicate = (liveNodes, collectionState, ssp) -> {
for (Replica r : collectionState.getReplicas()) {
if (r.getNodeName().equals(oldNodeName.get())) {
- return r.getState() == Replica.State.DOWN;
+ return ssp.getState(r) == Replica.State.DOWN;
}
}
return false;
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index e8abffe..e9fbf66 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -333,9 +333,9 @@ public class TestPullReplica extends SolrCloudTestCase {
if (r == null) {
return false;
}
- statesSeen.add(r.getState());
- log.info("CollectionStateWatcher saw state: {}", r.getState());
- return r.getState() == Replica.State.ACTIVE;
+ statesSeen.add(ssp.getState(r));
+ log.info("CollectionStateWatcher saw state: {}", ssp.getState(r));
+ return ssp.getState(r) == Replica.State.ACTIVE;
});
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.PULL).process(cluster.getSolrClient());
waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
@@ -509,10 +509,10 @@ public class TestPullReplica extends SolrCloudTestCase {
DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
assertEquals(1, docCollection.getSlices().size());
- waitForNumDocsInAllActiveReplicas(0);
+ waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 0);
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
cluster.getSolrClient().commit(collectionName);
- waitForNumDocsInAllActiveReplicas(1);
+ waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 1);
JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
pullReplicaJetty.stop();
@@ -522,20 +522,20 @@ public class TestPullReplica extends SolrCloudTestCase {
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
cluster.getSolrClient().commit(collectionName);
- waitForNumDocsInAllActiveReplicas(2);
+ waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 2);
pullReplicaJetty.start();
waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
- waitForNumDocsInAllActiveReplicas(2);
+ waitForNumDocsInAllActiveReplicas(cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName), 2);
}
public void testSearchWhileReplicationHappens() {
}
- private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
+ private void waitForNumDocsInAllActiveReplicas(ShardStateProvider ssp, int numDocs) throws IOException, SolrServerException, InterruptedException {
DocCollection docCollection = getCollectionState(collectionName);
- waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()));
+ waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> ssp.getState(r) == Replica.State.ACTIVE).collect(Collectors.toList()));
}
private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas) throws IOException, SolrServerException, InterruptedException {
@@ -584,14 +584,15 @@ public class TestPullReplica extends SolrCloudTestCase {
if (updateCollection) {
cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
}
+ ShardStateProvider ssp = cluster.getSolrClient().getZkStateReader().getShardStateProvider(collectionName);
DocCollection docCollection = getCollectionState(collectionName);
assertNotNull(docCollection);
assertEquals("Unexpected number of writer replicas: " + docCollection, numNrtReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r)== Replica.State.ACTIVE).count());
assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
assertEquals("Unexpected number of active replicas: " + docCollection, numTlogReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
return docCollection;
}
@@ -601,13 +602,13 @@ public class TestPullReplica extends SolrCloudTestCase {
private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
return (liveNodes, collectionState, ssp) -> {
for (Replica r:collectionState.getReplicas()) {
- if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+ if (ssp.getState(r) != Replica.State.DOWN && ssp.getState(r) != Replica.State.ACTIVE) {
return false;
}
- if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+ if (ssp.getState(r) == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
return false;
}
- if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+ if (ssp.getState(r) == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
return false;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
index eb4e26a..ab49c2c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -32,6 +32,7 @@ import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -273,13 +274,14 @@ public void testCantConnectToPullReplica() throws Exception {
cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
}
DocCollection docCollection = getCollectionState(collectionName);
+ ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
assertNotNull(docCollection);
assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter,
- docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
assertEquals("Unexpected number of pull replicas: " + docCollection, numPassive,
- docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
assertEquals("Unexpected number of active replicas: " + docCollection, numActive,
- docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
return docCollection;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java b/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
index 1cd70f4..a6e6edd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestQueryingOnDownCollection.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
@@ -74,9 +75,10 @@ public class TestQueryingOnDownCollection extends SolrCloudTestCase {
downAllReplicas();
// assert all replicas are in down state
+ ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(COLLECTION_NAME);
List<Replica> replicas = getCollectionState(COLLECTION_NAME).getReplicas();
for (Replica replica: replicas){
- assertEquals(replica.getState(), Replica.State.DOWN);
+ assertEquals(ssp.getState(replica), Replica.State.DOWN);
}
// assert all nodes as active
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
index 5e59b7c..3c845aa 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
@@ -766,7 +766,8 @@ public class TestTlogReplica extends SolrCloudTestCase {
private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException {
DocCollection docCollection = getCollectionState(collectionName);
- waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
+ ShardStateProvider ssp = cluster.getSolrClient().getClusterStateProvider().getShardStateProvider(collectionName);
+ waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> ssp.getState(r) == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
}
private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, int timeout) throws IOException, SolrServerException, InterruptedException {
@@ -815,17 +816,19 @@ public class TestTlogReplica extends SolrCloudTestCase {
}
private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+
if (updateCollection) {
cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
}
+ ShardStateProvider ssp = cluster.getSolrClient().getZkStateReader().getShardStateProvider(collectionName);
DocCollection docCollection = getCollectionState(collectionName);
assertNotNull(docCollection);
assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas,
- docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || ssp.getState(r) == Replica.State.ACTIVE).count());
return docCollection;
}
@@ -835,13 +838,14 @@ public class TestTlogReplica extends SolrCloudTestCase {
private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
return (liveNodes, collectionState, ssp) -> {
for (Replica r:collectionState.getReplicas()) {
- if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+ if (ssp.getState(r) != Replica.State.DOWN && ssp.getState(r) != Replica.State.ACTIVE) {
return false;
}
- if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+ if (ssp.getState(r) == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
return false;
}
- if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+ if (ssp.getState(r) == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+
return false;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
index 041ccec..02e445f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
@@ -23,6 +23,7 @@ import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.RetryUtil;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -53,8 +54,12 @@ public class CollectionReloadTest extends SolrCloudTestCase {
CollectionAdminRequest.createCollection(testCollectionName, "conf", 1, 1)
.process(cluster.getSolrClient());
+
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
Replica leader
- = cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", DEFAULT_TIMEOUT);
+ = zkStateReader.getShardStateProvider(testCollectionName)
+ .getLeader(zkStateReader.getCollection(testCollectionName)
+ .getSlice("shard1"), DEFAULT_TIMEOUT);
long coreStartTime = getCoreStatus(leader).getCoreStartTime().getTime();
CollectionAdminRequest.reloadCollection(testCollectionName).process(cluster.getSolrClient());
@@ -77,7 +82,7 @@ public class CollectionReloadTest extends SolrCloudTestCase {
waitForState("Timed out waiting for core to re-register as ACTIVE after session expiry", testCollectionName, (n, c, ssp) -> {
log.info("Collection state: {}", c.toString());
Replica expiredReplica = c.getReplica(leader.getName());
- return expiredReplica.getState() == Replica.State.ACTIVE && c.getZNodeVersion() > initialStateVersion;
+ return ssp.getState(expiredReplica) == Replica.State.ACTIVE && c.getZNodeVersion() > initialStateVersion;
});
log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!");
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
index 04c39ec..1d8860c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
@@ -110,7 +110,7 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
// wait for recoveries to finish, for a clean shutdown - see SOLR-9645
waitForState("Expected to see all replicas active", collectionName, (n, c, ssp) -> {
for (Replica r : c.getReplicas()) {
- if (r.getState() != Replica.State.ACTIVE)
+ if (ssp.getState(r) != Replica.State.ACTIVE)
return false;
}
return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
index 6e8c873..b295a1f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CustomCollectionTest.java
@@ -107,7 +107,7 @@ public class CustomCollectionTest extends SolrCloudTestCase {
if (c.getSlice("x") == null)
return false;
for (Replica r : c.getSlice("x")) {
- if (r.getState() != Replica.State.ACTIVE)
+ if (ssp.getState(r) != Replica.State.ACTIVE)
return false;
}
return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 95273ea..d049af6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -227,7 +227,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice.getReplicas().size() == 2) {
- if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+ if (slice.getReplicas().stream().noneMatch(r -> ssp.getState(r) == Replica.State.RECOVERING)) {
// we see replicas and none of them are recovering
newReplicaLatch.countDown();
return true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
index c6bcf54..333fc55 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
@@ -79,7 +79,7 @@ public class HDFSCollectionsAPITest extends SolrCloudTestCase {
jettySolrRunner.stop();
waitForState("", collection, (liveNodes, collectionState, ssp) -> {
Replica replica = collectionState.getSlice("shard1").getReplicas().iterator().next();
- return replica.getState() == Replica.State.DOWN;
+ return ssp.getState(replica) == Replica.State.DOWN;
});
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
index f0bae3b..42bc64b 100644
--- a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
@@ -16,8 +16,6 @@
*/
package org.apache.solr.core.snapshots;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,10 +25,11 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.ListSnapshots;
@@ -53,6 +52,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
@SolrTestCaseJ4.SuppressSSL // Currently unknown why SSL does not work with this test
@Slow
public class TestSolrCloudSnapshots extends SolrCloudTestCase {
@@ -112,9 +113,10 @@ public class TestSolrCloudSnapshots extends SolrCloudTestCase {
// Figure out if at-least one replica is "down".
DocCollection collState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ ShardStateProvider ssp = solrClient.getZkStateReader().getShardStateProvider(collectionName);
for (Slice s : collState.getSlices()) {
for (Replica replica : s.getReplicas()) {
- if (replica.getState() == State.DOWN) {
+ if (ssp.getState(replica) == State.DOWN) {
stoppedCoreName = Optional.of(replica.getCoreName());
}
}
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index 88ef94d..dcfb144 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -37,6 +37,7 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -994,10 +995,11 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
Thread.sleep(10);
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState state = cloudClient.getZkStateReader().getClusterState();
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
int numActiveReplicas = 0;
for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
- if (rep.getState().equals(Replica.State.ACTIVE))
+ if (ssp.getState(rep).equals(Replica.State.ACTIVE))
numActiveReplicas++;
assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
@@ -1066,11 +1068,12 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
Thread.sleep(10);
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState state = cloudClient.getZkStateReader().getClusterState();
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
int numActiveReplicas = 0;
for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
- if (rep.getState().equals(Replica.State.ACTIVE))
+ if (ssp.getState(rep).equals(Replica.State.ACTIVE))
numActiveReplicas++;
}
assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
@@ -1320,10 +1323,11 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
Thread.sleep(10);
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState state = cloudClient.getZkStateReader().getClusterState();
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(DEFAULT_COLLECTION);
int numActiveReplicas = 0;
for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
- if (rep.getState().equals(Replica.State.ACTIVE))
+ if (ssp.getState(rep).equals(Replica.State.ACTIVE))
numActiveReplicas++;
assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
index 16b610f..b2f8f18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DirectShardState.java
@@ -54,4 +54,9 @@ public class DirectShardState implements ShardStateProvider {
public boolean isActive(Slice slice) {
return slice.getState() == Slice.State.ACTIVE;
}
+
+ @Override
+ public Replica getLeader(Slice slice, int timeout) throws InterruptedException {
+ throw new RuntimeException("Not implemented");//TODO
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
index 74d9d74..3a5ab8e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardStateProvider.java
@@ -29,6 +29,10 @@ public interface ShardStateProvider {
Replica getLeader(Slice slice);
+ /**Gete the leader of the slice. Wait for one if there is no leader
+ */
+ Replica getLeader(Slice slice, int timeout) throws InterruptedException;
+
boolean isActive(Replica replica);
boolean isActive(Slice slice);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 955295a..954b9c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -37,6 +37,7 @@ import java.util.function.BiPredicate;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
@@ -295,10 +296,11 @@ public class PolicyHelper {
private static void addMissingReplicas(SolrCloudManager cloudManager, Suggestion.Ctx ctx) throws IOException {
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> coll.forEach(slice -> {
+ ShardStateProvider ssp = cloudManager.getClusterStateProvider().getShardStateProvider(coll.getName());
if (!ctx.needMore()) return;
ReplicaCount replicaCount = new ReplicaCount();
slice.forEach(replica -> {
- if (replica.getState() == Replica.State.ACTIVE || replica.getState() == Replica.State.RECOVERING) {
+ if (ssp.getState(replica) == Replica.State.ACTIVE || ssp.getState(replica) == Replica.State.RECOVERING) {
replicaCount.increment(replica.getType());
}
});
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 2f6a48a..4705554 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -411,7 +411,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
getClusterStateProvider().connect();
- assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+ assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, (d, ssp) -> predicate.test(d));
}
/**
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 22b807f..1fdd5eb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -28,12 +28,12 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -260,16 +260,16 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ if(ssp.isActive(replica)) {
shuffler.add(replica);
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index a493f5a..9b84137 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.ClassificationEvaluation;
@@ -346,12 +347,14 @@ public class TextLogitStream extends TupleStream implements Expressible {
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
+
List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ if(ssp.isActive(replica)) {
shuffler.add(replica);
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index b42ab77..20297e8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -25,15 +25,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Random;
-import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
@@ -50,7 +50,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -397,16 +396,13 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
- ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
-
for(Slice slice : slices) {
String sliceName = slice.getName();
long checkpoint;
if(initialCheckpoint > -1) {
checkpoint = initialCheckpoint;
} else {
- checkpoint = getCheckpoint(slice, liveNodes);
+ checkpoint = getCheckpoint(zkStateReader.getShardStateProvider(this.collection), slice);
}
this.checkpoints.put(sliceName, checkpoint);
@@ -414,7 +410,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
//Gets the highest version number for the slice.
- private long getCheckpoint(Slice slice, Set<String> liveNodes) throws IOException {
+ private long getCheckpoint(ShardStateProvider ssp, Slice slice) throws IOException {
Collection<Replica> replicas = slice.getReplicas();
long checkpoint = -1;
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -423,7 +419,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
params.set(DISTRIB, "false");
params.set("rows", 1);
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ if(ssp.isActive(replica)) {
String coreUrl = replica.getCoreUrl();
SolrStream solrStream = new SolrStream(coreUrl, params);
@@ -476,14 +472,12 @@ public class TopicStream extends CloudSolrStream implements Expressible {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
- ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
-
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(checkpointCollection);
OUTER:
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
+ if(ssp.isActive(replica)){
HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
try {
SolrDocument doc = httpClient.getById(id);
@@ -518,10 +512,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
mParams.set("fl", fl);
Random random = new Random();
-
- ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
-
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(this.collection);
for(Slice slice : slices) {
ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
long checkpoint = checkpoints.get(slice.getName());
@@ -529,7 +520,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+ if(ssp.isActive(replica))
shuffler.add(replica);
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 086bddd..a5c6c49 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -24,11 +24,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.Map;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -132,6 +133,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
//SolrCloud Sharding
CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
ClusterState clusterState = zkStateReader.getClusterState();
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -139,7 +141,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+ if(ssp.isActive(replica))
shuffler.add(replica);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
index 5e61bc1..a6ee609 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
@@ -17,12 +17,12 @@
package org.apache.solr.common.cloud;
import java.lang.invoke.MethodHandles;
-
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.slf4j.Logger;
@@ -61,6 +61,7 @@ public class ClusterStateUtil {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
while (!success && System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
@@ -81,7 +82,7 @@ public class ClusterStateUtil {
for (Replica replica : replicas) {
// on a live node?
final boolean live = clusterState.liveNodesContain(replica.getNodeName());
- final boolean isActive = replica.getState() == Replica.State.ACTIVE;
+ final boolean isActive = ssp.getState(replica) == Replica.State.ACTIVE;
if (!live || !isActive) {
// fail
success = false;
@@ -219,11 +220,12 @@ public class ClusterStateUtil {
public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
Slice[] slices;
slices = zkStateReader.getClusterState().getCollection(collection).getActiveSlicesArr();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
int liveAndActive = 0;
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
- boolean active = replica.getState() == Replica.State.ACTIVE;
+ boolean active = ssp.getState(replica)== Replica.State.ACTIVE;
if (live && active) {
liveAndActive++;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
index d3ad502..06211da 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java
@@ -19,7 +19,7 @@ package org.apache.solr.common.cloud;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
+import java.util.function.BiPredicate;
import org.apache.solr.client.solrj.cloud.ShardStateProvider;
@@ -27,7 +27,7 @@ import org.apache.solr.client.solrj.cloud.ShardStateProvider;
* Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
*
* @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
- * @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
+ * @see ZkStateReader#waitForState(String, long, TimeUnit, BiPredicate)
*/
public interface CollectionStatePredicate {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 21d59fe..f70a815 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -168,6 +168,7 @@ public class Replica extends ZkNodeProps {
}
/** Returns the {@link State} of this replica. */
+ @Deprecated
public State getState() {
return state;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 352a716..4e95aa6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -158,8 +159,7 @@ public class ZkStateReader implements SolrCloseable {
protected volatile ClusterState clusterState;
private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
- private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));
- ;
+ public static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));
public static final String LEADER_ELECT_ZKNODE = "leader_elect";
@@ -219,7 +219,12 @@ public class ZkStateReader implements SolrCloseable {
private Set<ClusterPropertiesListener> clusterPropertiesListeners = ConcurrentHashMap.newKeySet();
- private final ShardStateProvider directReplicaState = new DirectShardState(s -> liveNodes.contains(s));
+ private final ShardStateProvider directReplicaState = new DirectShardState(s -> liveNodes.contains(s)) {
+ @Override
+ public Replica getLeader(Slice slice, int timeout) throws InterruptedException {
+ return getLeaderRetry(slice.collection, slice.getName(), timeout);
+ }
+ };
/**
* Used to submit notifications to Collection Properties watchers in order
@@ -1736,7 +1741,7 @@ public class ZkStateReader implements SolrCloseable {
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
- * @see #waitForState(String, long, TimeUnit, Predicate)
+ * @see #waitForState(String, long, TimeUnit, BiPredicate)
* @see #registerCollectionStateWatcher
*/
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
@@ -1785,7 +1790,7 @@ public class ZkStateReader implements SolrCloseable {
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
- public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
+ public void waitForState(final String collection, long wait, TimeUnit unit, BiPredicate<DocCollection, ShardStateProvider> predicate)
throws InterruptedException, TimeoutException {
if (closed) {
@@ -1797,7 +1802,7 @@ public class ZkStateReader implements SolrCloseable {
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
DocCollectionWatcher watcher = (c, ssp) -> {
docCollection.set(c);
- boolean matches = predicate.test(c);
+ boolean matches = predicate.test(c, getShardStateProvider(collection));
if (matches)
latch.countDown();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index eec0d99..f7f3709 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -205,7 +206,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
- zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
+ zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection, ssp) -> {
if (docCollection == null)
return true;
return false;
@@ -219,12 +220,15 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(DEFAULT_COLLECTION);
for (; ; ) {
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection("collection1");
Slice slice = coll.getSlice(shardName);
- if (slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getLeader().getState() == Replica.State.ACTIVE) {
- log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, slice.getLeader(),timeOut.timeElapsed(MILLISECONDS) );
+
+ Replica leader = ssp.getLeader(slice);
+ if (leader != null && !leader.equals(oldLeader) && ssp.getState(leader) == Replica.State.ACTIVE) {
+ log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, leader,timeOut.timeElapsed(MILLISECONDS) );
break;
}
@@ -237,12 +241,13 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Thread.sleep(100);
}
- zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
+ zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection, sp) -> {
if (docCollection == null)
return false;
Slice slice = docCollection.getSlice(shardName);
- if (slice != null && slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getLeader().getState() == Replica.State.ACTIVE) {
+ Replica leader = sp.getLeader(slice);
+ if (slice != null && leader!= null && !leader.equals(oldLeader) && sp.getState(leader) == Replica.State.ACTIVE) {
log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, slice.getLeader(), timeOut.timeElapsed(MILLISECONDS) );
return true;
}
@@ -254,9 +259,9 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Replica.State expectedState) throws InterruptedException, TimeoutException {
log.info("verifyReplicaStatus ({}) shard={} coreNodeName={}", collection, shard, coreNodeName);
reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
- (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
+ (collectionState,ssp) -> collectionState != null && collectionState.getSlice(shard) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
- && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
+ && ssp.getState( collectionState.getSlice(shard).getReplicasMap().get(coreNodeName)) == expectedState);
}
protected static void assertAllActive(String collection, ZkStateReader zkStateReader)
@@ -268,8 +273,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
if (docCollection == null || docCollection.getSlices() == null) {
throw new IllegalArgumentException("Cannot find collection:" + collection);
}
-
- Map<String,Slice> slices = docCollection.getSlicesMap();
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collection);
+ Map<String,Slice> slices = docCollection.getSlicesMap();
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
if (slice.getState() != Slice.State.ACTIVE) {
@@ -278,8 +283,8 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Map<String,Replica> shards = slice.getReplicasMap();
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
Replica replica = shard.getValue();
- if (replica.getState() != Replica.State.ACTIVE) {
- fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + replica.getState());
+ if (ssp.getState(replica) != Replica.State.ACTIVE) {
+ fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + ssp.getState(replica));
}
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index ee8165e..0fa592b 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -2118,13 +2118,14 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
assertNotNull(cs);
final DocCollection docCollection = cs.getCollectionOrNull(testCollectionName);
assertNotNull("No collection found for " + testCollectionName, docCollection);
+ ShardStateProvider ssp = cloudClient.getZkStateReader().getShardStateProvider(testCollectionName);
Slice shard = docCollection.getSlice(shardId);
assertNotNull("No Slice for "+shardId, shard);
allReplicasUp = true; // assume true
Collection<Replica> replicas = shard.getReplicas();
assertTrue("Did not find correct number of replicas. Expected:" + rf + " Found:" + replicas.size(), replicas.size() == rf);
- leader = shard.getLeader();
+ leader = ssp.getLeader(shard);
assertNotNull(leader);
log.info("Found "+replicas.size()+" replicas and leader on "+
leader.getNodeName()+" for "+shardId+" in "+testCollectionName);
@@ -2132,8 +2133,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// ensure all replicas are "active" and identify the non-leader replica
for (Replica replica : replicas) {
if (!zkShardTerms.canBecomeLeader(replica.getName()) ||
- replica.getState() != Replica.State.ACTIVE) {
- log.info("Replica {} is currently {}", replica.getName(), replica.getState());
+ ssp.getState(replica) != Replica.State.ACTIVE) {
+ log.info("Replica {} is currently {}", replica.getName(), ssp.getState(replica));
allReplicasUp = false;
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
index a6939b2..bffcd46 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.cloud.ShardStateProvider;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase.CloudJettyRunner;
import org.apache.solr.common.cloud.DocCollection;
@@ -650,6 +651,7 @@ public class ChaosMonkey {
if (docCollection == null) {
monkeyLog("Could not find collection {}", collectionName);
}
+ ShardStateProvider ssp = zkStateReader.getShardStateProvider(collectionName);
StringBuilder builder = new StringBuilder();
builder.append("Collection status: {");
for (Slice slice:docCollection.getSlices()) {
@@ -660,7 +662,7 @@ public class ChaosMonkey {
m.find();
String jettyPort = m.group(1);
builder.append(String.format(Locale.ROOT, "%s(%s): {state: %s, type: %s, leader: %s, Live: %s}, ",
- replica.getName(), jettyPort, replica.getState(), replica.getType(), (replica.get("leader")!= null), zkStateReader.getClusterState().liveNodesContain(replica.getNodeName())));
+ replica.getName(), jettyPort, ssp.getState(replica), replica.getType(), (replica.get("leader")!= null), zkStateReader.getClusterState().liveNodesContain(replica.getNodeName())));
}
if (slice.getReplicas().size() > 0) {
builder.setLength(builder.length() - 2);
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index fed8c12..c4601df 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
}
for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
- reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
+ reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState,ssp) -> collectionState == null ? true : false);
}
}