You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/01/03 10:52:32 UTC
[solr] 01/01: All changes from github.com/apache/solr/pull/1242
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/SOLR-16580_9_1
in repository https://gitbox.apache.org/repos/asf/solr.git
commit ce9101db9ff032ccb724edfd68f490badb61158e
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Jan 3 21:51:51 2023 +1100
All changes from github.com/apache/solr/pull/1242
---
.../solr/cloud/DistributedClusterStateUpdater.java | 5 +-
.../solr/cloud/overseer/ClusterStateMutator.java | 4 +-
.../solr/cloud/overseer/CollectionMutator.java | 7 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 15 +++-
.../org/apache/solr/core/backup/BackupManager.java | 2 +-
.../apache/solr/handler/admin/ClusterStatus.java | 5 ++
.../solr/handler/admin/CollectionsHandler.java | 6 +-
.../org/apache/solr/cloud/ClusterStateTest.java | 10 +--
.../OverseerCollectionConfigSetProcessorTest.java | 8 +-
.../test/org/apache/solr/cloud/SliceStateTest.java | 2 +-
.../solr/cloud/overseer/ZkStateReaderTest.java | 70 ++++++++++++----
.../solr/cloud/overseer/ZkStateWriterTest.java | 11 ++-
.../client/solrj/cloud/DistribStateManager.java | 12 +++
.../solrj/impl/ZkClientClusterStateProvider.java | 8 +-
.../solr/common/cloud/PerReplicaStatesFetcher.java | 10 +++
.../apache/solr/common/cloud/ZkStateReader.java | 17 ----
.../solrj/impl/BaseHttpClusterStateProvider.java | 27 +++++-
.../org/apache/solr/common/cloud/ClusterState.java | 92 +++++---------------
.../apache/solr/common/cloud/DocCollection.java | 97 ++++++++++++++--------
.../apache/solr/common/cloud/PerReplicaStates.java | 20 +++++
.../java/org/apache/solr/common/cloud/Replica.java | 49 ++++++-----
.../java/org/apache/solr/common/cloud/Slice.java | 26 +++++-
.../solrj/impl/CloudSolrClientCacheTest.java | 2 +-
.../cloud/PerReplicaStatesIntegrationTest.java | 8 +-
24 files changed, 326 insertions(+), 187 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index ca745b55ff6..2ee7cba3239 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -628,13 +628,16 @@ public class DistributedClusterStateUpdater {
// This factory method can detect a missing configName and supply it by reading it from the
// old ZK location.
// TODO in Solr 10 remove that factory method
- ClusterState clusterState =
+
+ ClusterState clusterState;
+ clusterState =
ZkClientClusterStateProvider.createFromJsonSupportingLegacyConfigName(
stat.getVersion(),
data,
Collections.emptySet(),
updater.getCollectionName(),
zkStateReader.getZkClient());
+
return clusterState;
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index f979d47a9af..f95959e33e5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -125,7 +125,9 @@ public class ClusterStateMutator {
}
assert !collectionProps.containsKey(CollectionAdminParams.COLL_CONF);
- DocCollection newCollection = new DocCollection(cName, slices, collectionProps, router, -1);
+ DocCollection newCollection =
+ new DocCollection(
+ cName, slices, collectionProps, router, -1, stateManager.getPrsSupplier(cName));
return new ZkWriteCommand(cName, newCollection);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 6269e7ba329..492fe9efeb5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -172,7 +172,12 @@ public class CollectionMutator {
DocCollection collection =
new DocCollection(
- coll.getName(), coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion());
+ coll.getName(),
+ coll.getSlicesMap(),
+ props,
+ coll.getRouter(),
+ coll.getZNodeVersion(),
+ stateManager.getPrsSupplier(coll.getName()));
if (replicaOps == null) {
return new ZkWriteCommand(coll.getName(), collection);
} else {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 8fb0ce287fc..6aa561632cd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -280,13 +280,24 @@ public class ZkStateWriter {
Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
DocCollection newCollection =
new DocCollection(
- name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion());
+ name,
+ c.getSlicesMap(),
+ c.getProperties(),
+ c.getRouter(),
+ stat.getVersion(),
+ new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path));
clusterState = clusterState.copyWith(name, newCollection);
} else {
log.debug("going to create_collection {}", path);
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection =
- new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0);
+ new DocCollection(
+ name,
+ c.getSlicesMap(),
+ c.getProperties(),
+ c.getRouter(),
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(reader.getZkClient(), path));
clusterState = clusterState.copyWith(name, newCollection);
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index 647886bca6a..660ea80cbaa 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -219,7 +219,7 @@ public class BackupManager {
repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
is.readBytes(arr, 0, (int) is.length());
- ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet());
+ ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet(), null);
return c_state.getCollection(collectionName);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index c1a7c31a02d..9130fcc317a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -188,6 +189,10 @@ public class ClusterStatus {
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
+ if (message.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
+ PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
+ collectionStatus.put("PRS", prs);
+ }
collectionProps.add(name, collectionStatus);
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 801b56d743c..cdaf70fb453 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1237,7 +1237,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CLUSTERSTATUS,
(req, rsp, h) -> {
Map<String, Object> all =
- copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_);
+ copy(req.getParams(), null, COLLECTION_PROP, SHARD_ID_PROP, _ROUTE_, "prs");
new ClusterStatus(
h.coreContainer.getZkController().getZkStateReader(), new ZkNodeProps(all))
.getClusterStatus(rsp.getValues());
@@ -2010,7 +2010,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
replicas.add(shard.getLeader());
}
for (Replica replica : replicas) {
- String state = replica.getStr(ZkStateReader.STATE_PROP);
+ State state = replica.getState();
if (log.isDebugEnabled()) {
log.debug(
"Checking replica status, collection={} replica={} state={}",
@@ -2019,7 +2019,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
state);
}
if (!n.contains(replica.getNodeName())
- || !state.equals(Replica.State.ACTIVE.toString())) {
+ || !state.equals(Replica.State.ACTIVE)) {
replicaNotAliveCnt++;
return false;
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index 1af173271e0..7acb960828a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -59,14 +59,14 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
Slice slice2 = new Slice("shard2", sliceToProps, null, "collection1");
slices.put("shard2", slice2);
collectionStates.put(
- "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT));
+ "collection1", new DocCollection("collection1", slices, props, DocRouter.DEFAULT, 0, null));
collectionStates.put(
- "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT));
+ "collection2", new DocCollection("collection2", slices, props, DocRouter.DEFAULT, 0, null));
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
byte[] bytes = Utils.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes));
- ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
+ ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
assertEquals(
"Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
@@ -90,13 +90,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
.get("node1")
.getStr("prop2"));
- loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes);
+ loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes, null);
assertEquals(
"Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
- loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes);
+ loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes, null);
assertEquals(
"Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index f90e3805405..c2fcc138c2c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -661,6 +661,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
handleCreateCollMessageProps(ZkNodeProps.load(bytes));
}
+ @SuppressWarnings("DirectInvocationOnMock")
private void handleCreateCollMessageProps(ZkNodeProps props) {
log.info("track created replicas / collections");
try {
@@ -676,7 +677,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
collName,
new ClusterState.CollectionRef(
new DocCollection(
- collName, new HashMap<>(), props.getProperties(), DocRouter.DEFAULT)));
+ collName,
+ new HashMap<>(),
+ props.getProperties(),
+ DocRouter.DEFAULT,
+ 0,
+ distribStateManagerMock.getPrsSupplier(collName))));
}
if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) {
replicas.add(props);
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index f12b9597ccb..4517cc1bc15 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -59,7 +59,7 @@ public class SliceStateTest extends SolrTestCaseJ4 {
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
byte[] bytes = Utils.toJSON(clusterState);
- ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
+ ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
assertSame(
"Default state not set to active",
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 73093e41f3f..a839f3e6854 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -51,12 +51,15 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.admin.ConfigSetsHandler;
+import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@LogLevel(
+ "org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG")
public class ZkStateReaderTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long TIMEOUT = 30;
@@ -132,7 +135,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0));
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1"))));
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
@@ -157,7 +162,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
@@ -168,7 +175,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
Map<String, Object> props = new HashMap<>();
props.put("x", "y");
props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
- state = new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0);
+ state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ props,
+ DocRouter.DEFAULT,
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
@@ -207,7 +222,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
@@ -239,7 +256,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
@@ -256,7 +275,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());
- assertEquals(-1, ref.get().getChildNodesVersion());
+ // dummy node created +1 and deleted +1 so 2
+ assertEquals(2, ref.get().getChildNodesVersion());
DocCollection collection = ref.get();
PerReplicaStates prs =
@@ -275,7 +295,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ref = reader.getClusterState().getCollectionRef("c1");
assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
- assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now
+ assertEquals(3, ref.get().getChildNodesVersion()); // but child version should be 1 now
prs = ref.get().getPerReplicaStates();
PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
@@ -289,7 +309,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ref = reader.getClusterState().getCollectionRef("c1");
assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
// but child version should be 3 now (1 del + 1 add)
- assertEquals(3, ref.get().getChildNodesVersion());
+ assertEquals(5, ref.get().getChildNodesVersion());
// now delete the collection
wc = new ZkWriteCommand("c1", null);
@@ -314,7 +334,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ref = reader.getClusterState().getCollectionRef("c1");
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());
- assertEquals(-1, ref.get().getChildNodesVersion()); // child node version is reset
+ assertEquals(2, ref.get().getChildNodesVersion()); // child node version is reset
// re-add PRS
collection = ref.get();
@@ -335,7 +355,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ref = reader.getClusterState().getCollectionRef("c1");
// child version should be reset since the state.json node was deleted and re-created
- assertEquals(1, ref.get().getChildNodesVersion());
+ assertEquals(3, ref.get().getChildNodesVersion());
}
public void testForciblyRefreshAllClusterState() throws Exception {
@@ -354,9 +374,15 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new DocCollection(
"c1",
new HashMap<>(),
- Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ Map.of(
+ ZkStateReader.CONFIGNAME_PROP,
+ ConfigSetsHandler.DEFAULT_CONFIGSET_NAME,
+ DocCollection.CollectionStateProps.PER_REPLICA_STATE,
+ "true"),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
@@ -376,7 +402,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- ref.get().getZNodeVersion());
+ ref.get().getZNodeVersion(),
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
@@ -397,7 +425,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c2")));
ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
@@ -432,7 +462,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
DocCollection state2 =
new DocCollection(
@@ -440,7 +472,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- 0);
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
// do not listen to c2
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
@@ -493,7 +527,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
- currentVersion);
+ currentVersion,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index be38ec88fe6..27ba3dda4b8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -32,6 +32,7 @@ import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -161,7 +162,15 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
prsProps.put("perReplicaState", Boolean.TRUE);
ZkWriteCommand prs1 =
new ZkWriteCommand(
- "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+ "prs1",
+ new DocCollection(
+ "prs1",
+ new HashMap<>(),
+ prsProps,
+ DocRouter.DEFAULT,
+ 0,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ zkClient, DocCollection.getCollectionPath("c1"))));
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
// First write is flushed immediately
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
index 09be10c73c9..7f2189a63f1 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistribStateManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -123,6 +124,17 @@ public interface DistribStateManager extends SolrCloseable {
throw new UnsupportedOperationException("Not implemented");
}
+ default DocCollection.PrsSupplier getPrsSupplier(String collName) {
+ return new DocCollection.PrsSupplier(
+ () -> {
+ try {
+ return getReplicaStates(DocCollection.getCollectionPath(collName));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
/**
* Remove data recursively.
*
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index cfb0911a142..98c89b1ee0d 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -28,6 +28,7 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -113,7 +114,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
}
}
}
- return ClusterState.createFromCollectionMap(version, stateMap, liveNodes);
+ return ClusterState.createFromCollectionMap(
+ version,
+ stateMap,
+ liveNodes,
+ new PerReplicaStatesFetcher.LazyPrsSupplier(
+ zkClient, DocCollection.getCollectionPath(coll)));
}
@Override
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
index f959cab3e03..c8baa8b4f07 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesFetcher.java
@@ -17,13 +17,17 @@
package org.apache.solr.common.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PerReplicaStatesFetcher {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link
* Stat#getCversion()} of state.json. If this is not modified, the same object is returned
@@ -50,4 +54,10 @@ public class PerReplicaStatesFetcher {
e);
}
}
+
+ public static class LazyPrsSupplier extends DocCollection.PrsSupplier {
+ public LazyPrsSupplier(SolrZkClient zkClient, String collectionPath) {
+ super(() -> PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null));
+ }
+ }
}
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 26239784115..d5f69e8a219 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1684,21 +1684,6 @@ public class ZkStateReader implements SolrCloseable {
throws KeeperException, InterruptedException {
String collectionPath = DocCollection.getCollectionPath(coll);
while (true) {
- ClusterState.initReplicaStateProvider(
- () -> {
- try {
- PerReplicaStates replicaStates =
- PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
- log.debug(
- "per-replica-state ver: {} fetched for initializing {} ",
- replicaStates.cversion,
- collectionPath);
- return replicaStates;
- } catch (Exception e) {
- // TODO
- throw new RuntimeException(e);
- }
- });
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1723,8 +1708,6 @@ public class ZkStateReader implements SolrCloseable {
}
}
return null;
- } finally {
- ClusterState.clearReplicaStateProvider();
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index 875f21a8ade..a2420ec37b2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -35,6 +35,8 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -123,6 +125,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
params.set("collection", collection);
}
params.set("action", "CLUSTERSTATUS");
+ params.set("prs", "true");
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
SimpleOrderedMap<?> cluster = (SimpleOrderedMap<?>) client.request(request).get("cluster");
@@ -147,7 +150,13 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
Set<String> liveNodes = new HashSet<>((List<String>) (cluster.get("live_nodes")));
this.liveNodes = liveNodes;
liveNodesTimestamp = System.nanoTime();
- ClusterState cs = ClusterState.createFromCollectionMap(znodeVersion, collectionsMap, liveNodes);
+ ClusterState cs = new ClusterState(liveNodes, new HashMap<>());
+ for (Map.Entry<String, Object> e : collectionsMap.entrySet()) {
+ @SuppressWarnings("rawtypes")
+ Map m = (Map) e.getValue();
+ cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, m));
+ }
+
if (clusterProperties != null) {
Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
if (properties != null) {
@@ -157,6 +166,22 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
return cs;
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private DocCollection fillPrs(int znodeVersion, Map.Entry<String, Object> e, Map m) {
+ DocCollection.PrsSupplier prsSupplier = null;
+ if (m.containsKey("PRS")) {
+ Map prs = (Map) m.remove("PRS");
+ prsSupplier =
+ new DocCollection.PrsSupplier(
+ () ->
+ new PerReplicaStates(
+ (String) prs.get("path"),
+ (Integer) prs.get("cversion"),
+ (List<String>) prs.get("states")));
+ }
+ return ClusterState.collectionFromObjects(e.getKey(), m, znodeVersion, prsSupplier);
+ }
+
@Override
public Set<String> getLiveNodes() {
if (liveNodes == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 3a96b8b8dcd..67adc9667dd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -26,12 +26,10 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -227,33 +225,49 @@ public class ClusterState implements JSONWriter.Writable {
* @param liveNodes list of live nodes
* @return the ClusterState
*/
- public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
+ public static ClusterState createFromJson(
+ int version, byte[] bytes, Set<String> liveNodes, DocCollection.PrsSupplier prsSupplier) {
if (bytes == null || bytes.length == 0) {
return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap());
}
@SuppressWarnings({"unchecked"})
Map<String, Object> stateMap =
(Map<String, Object>) Utils.fromJSON(bytes, 0, bytes.length, STR_INTERNER_OBJ_BUILDER);
- return createFromCollectionMap(version, stateMap, liveNodes);
+ return createFromCollectionMap(version, stateMap, liveNodes, prsSupplier);
+ }
+
+ @Deprecated
+ public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
+ return createFromJson(version, bytes, liveNodes, null);
}
public static ClusterState createFromCollectionMap(
- int version, Map<String, Object> stateMap, Set<String> liveNodes) {
+ int version,
+ Map<String, Object> stateMap,
+ Set<String> liveNodes,
+ DocCollection.PrsSupplier prsSupplier) {
Map<String, CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey();
@SuppressWarnings({"unchecked"})
DocCollection coll =
- collectionFromObjects(collectionName, (Map<String, Object>) entry.getValue(), version);
+ collectionFromObjects(
+ collectionName, (Map<String, Object>) entry.getValue(), version, prsSupplier);
collections.put(collectionName, new CollectionRef(coll));
}
return new ClusterState(collections, liveNodes);
}
+ @Deprecated
+ public static ClusterState createFromCollectionMap(
+ int version, Map<String, Object> stateMap, Set<String> liveNodes) {
+ return createFromCollectionMap(version, stateMap, liveNodes, null);
+ }
+
// TODO move to static DocCollection.loadFromMap
- private static DocCollection collectionFromObjects(
- String name, Map<String, Object> objs, int version) {
+ public static DocCollection collectionFromObjects(
+ String name, Map<String, Object> objs, int version, DocCollection.PrsSupplier prsSupplier) {
Map<String, Object> props;
Map<String, Slice> slices;
@@ -261,9 +275,6 @@ public class ClusterState implements JSONWriter.Writable {
if (log.isDebugEnabled()) {
log.debug("a collection {} has per-replica state", name);
}
- // this collection has replica states stored outside
- ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
- if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
}
@SuppressWarnings({"unchecked"})
Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(CollectionStateProps.SHARDS);
@@ -291,7 +302,7 @@ public class ClusterState implements JSONWriter.Writable {
router = DocRouter.getDocRouter((String) routerProps.get("name"));
}
- return new DocCollection(name, slices, props, router, version);
+ return new DocCollection(name, slices, props, router, version, prsSupplier);
}
@Override
@@ -427,63 +438,6 @@ public class ClusterState implements JSONWriter.Writable {
return collectionStates.size();
}
- interface ReplicaStatesProvider {
-
- Optional<ReplicaStatesProvider> get();
-
- PerReplicaStates getStates();
- }
-
- private static final ReplicaStatesProvider EMPTYSTATEPROVIDER =
- new ReplicaStatesProvider() {
- @Override
- public Optional<ReplicaStatesProvider> get() {
- return Optional.empty();
- }
-
- @Override
- public PerReplicaStates getStates() {
- throw new RuntimeException("Invalid operation");
- }
- };
-
- private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
-
- public static ReplicaStatesProvider getReplicaStatesProvider() {
- return (REPLICASTATES_PROVIDER.get() == null)
- ? EMPTYSTATEPROVIDER
- : REPLICASTATES_PROVIDER.get();
- }
-
- public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
- REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
- }
-
- public static void clearReplicaStateProvider() {
- REPLICASTATES_PROVIDER.remove();
- }
-
- private static class StatesProvider implements ReplicaStatesProvider {
- private final Supplier<PerReplicaStates> replicaStatesSupplier;
- private PerReplicaStates perReplicaStates;
- private boolean isPerReplicaState = false;
-
- public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
- this.replicaStatesSupplier = replicaStatesSupplier;
- }
-
- @Override
- public Optional<ReplicaStatesProvider> get() {
- return isPerReplicaState ? Optional.of(this) : Optional.empty();
- }
-
- @Override
- public PerReplicaStates getStates() {
- if (perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
- return perReplicaStates;
- }
- }
-
private static volatile Function<JSONParser, ObjectBuilder> STR_INTERNER_OBJ_BUILDER =
STANDARDOBJBUILDER;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 6d4f6394d45..4de7503b403 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
+import java.util.function.Supplier;
import org.apache.solr.common.cloud.Replica.ReplicaStateProps;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
@@ -63,13 +64,23 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final Boolean readOnly;
private final Boolean perReplicaState;
private final Map<String, Replica> replicaMap = new HashMap<>();
- private volatile PerReplicaStates perReplicaStates;
+ private PrsSupplier prsSupplier;
+ @Deprecated
public DocCollection(
String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
- this(name, slices, props, router, Integer.MAX_VALUE);
+ this(name, slices, props, router, Integer.MAX_VALUE, null);
}
+ @Deprecated
+ public DocCollection(
+ String name,
+ Map<String, Slice> slices,
+ Map<String, Object> props,
+ DocRouter router,
+ int zkVersion) {
+ this(name, slices, props, router, zkVersion, null);
+ }
/**
* @param name The name of the collection
* @param slices The logical shards of the collection. This is used directly and a copy is not
@@ -83,7 +94,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
Map<String, Slice> slices,
Map<String, Object> props,
DocRouter router,
- int zkVersion) {
+ int zkVersion,
+ PrsSupplier prsSupplier) {
super(props);
// -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental
// overwrites
@@ -100,9 +112,17 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
this.numPullReplicas = (Integer) verifyProp(props, CollectionStateProps.PULL_REPLICAS, 0);
this.perReplicaState =
(Boolean) verifyProp(props, CollectionStateProps.PER_REPLICA_STATE, Boolean.FALSE);
- ClusterState.getReplicaStatesProvider()
- .get()
- .ifPresent(it -> perReplicaStates = it.getStates());
+ if (this.perReplicaState) {
+ if (prsSupplier == null) {
+ throw new RuntimeException(
+ CollectionStateProps.PER_REPLICA_STATE
+ + " = true , but per-replica state supplier is not provided");
+ }
+ this.prsSupplier = prsSupplier;
+ for (Slice s : this.slices.values()) {
+ s.setPrsSupplier(prsSupplier);
+ }
+ }
Boolean readOnly = (Boolean) verifyProp(props, CollectionStateProps.READ_ONLY);
this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
@@ -139,30 +159,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
* only a replica is updated
*/
public DocCollection copyWith(PerReplicaStates newPerReplicaStates) {
- if (log.isDebugEnabled()) {
- log.debug(
- "collection :{} going to be updated : per-replica state :{} -> {}",
- name,
- getChildNodesVersion(),
- newPerReplicaStates.cversion);
+ if (this.prsSupplier != null) {
+ log.info("In-place update of PRS: {}", newPerReplicaStates);
+ this.prsSupplier.prs = newPerReplicaStates;
}
- if (getChildNodesVersion() >= newPerReplicaStates.cversion) return this;
- Set<String> modifiedReplicas =
- PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
- if (modifiedReplicas.isEmpty()) return this; // nothing is modified
- Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
- for (String s : modifiedReplicas) {
- Replica replica = getReplica(s);
- if (replica != null) {
- Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
- Slice shard = modifiedShards.get(replica.shard);
- modifiedShards.put(replica.shard, shard.copyWith(newReplica));
- }
- }
- DocCollection result =
- new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion);
- result.perReplicaStates = newPerReplicaStates;
- return result;
+ return this;
}
private void addNodeNameReplica(Replica replica) {
@@ -213,8 +214,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
* @return the resulting DocCollection
*/
public DocCollection copyWithSlices(Map<String, Slice> slices) {
- DocCollection result = new DocCollection(getName(), slices, propMap, router, znodeVersion);
- result.perReplicaStates = perReplicaStates;
+ DocCollection result =
+ new DocCollection(getName(), slices, propMap, router, znodeVersion, prsSupplier);
return result;
}
/** Return collection name. */
@@ -281,7 +282,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
public int getChildNodesVersion() {
- return perReplicaStates == null ? -1 : perReplicaStates.cversion;
+ PerReplicaStates prs = prsSupplier == null ? null : prsSupplier.get();
+ return prs == null ? -1 : prs.cversion;
}
public boolean isModified(int dataVersion, int childVersion) {
@@ -318,7 +320,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
+ "/"
+ znodeVersion
+ " "
- + (perReplicaStates == null ? "" : perReplicaStates.toString())
+ + (prsSupplier == null ? "" : prsSupplier.get())
+ ")="
+ toJSONString(this);
}
@@ -465,7 +467,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
public PerReplicaStates getPerReplicaStates() {
- return perReplicaStates;
+ return prsSupplier != null ? prsSupplier.get() : null;
+ }
+
+ public PrsSupplier getPrsSupplier() {
+ return prsSupplier;
}
public int getExpectedReplicaCount(Replica.Type type, int def) {
@@ -488,4 +494,27 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
String SHARDS = "shards";
String PER_REPLICA_STATE = "perReplicaState";
}
+
+ public static class PrsSupplier implements Supplier<PerReplicaStates> {
+
+ private volatile PerReplicaStates prs;
+
+ private Supplier<PerReplicaStates> supplier;
+
+ public PrsSupplier(Supplier<PerReplicaStates> supplier) {
+ this.supplier = supplier;
+ }
+
+ public PrsSupplier(PerReplicaStates prs) {
+ this.prs = prs;
+ }
+
+ @Override
+ public PerReplicaStates get() {
+ if (prs == null) {
+ prs = supplier.get();
+ }
+ return prs;
+ }
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index f756ea1883f..698da842845 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.cloud.Replica.ReplicaStateProps;
@@ -302,4 +303,23 @@ public class PerReplicaStates implements ReflectMapWriter {
return duplicate;
}
}
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ ReflectMapWriter.super.writeMap(
+ new EntryWriter() {
+ @Override
+ public EntryWriter put(CharSequence k, Object v) throws IOException {
+ if ("states".equals(k.toString())) {
+ ew.put(
+ "states",
+ (IteratorWriter)
+ iw -> states.forEachEntry((s, state) -> iw.addNoEx(state.toString())));
+ } else {
+ ew.put(k, v);
+ }
+ return this;
+ }
+ });
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 308d37faeca..dc90e6b975a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -137,11 +137,15 @@ public class Replica extends ZkNodeProps implements MapWriter {
public final String core;
public final Type type;
public final String shard, collection;
- private PerReplicaStates.State replicaState;
+ private DocCollection.PrsSupplier prsSupplier;
// mutable
private State state;
+ void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) {
+ this.prsSupplier = prsSupplier;
+ }
+
public Replica(String name, Map<String, Object> map, String collection, String shard) {
super(new HashMap<>());
propMap.putAll(map);
@@ -151,7 +155,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
this.node = (String) propMap.get(ReplicaStateProps.NODE_NAME);
this.core = (String) propMap.get(ReplicaStateProps.CORE_NAME);
this.type = Type.get((String) propMap.get(ReplicaStateProps.TYPE));
- readPrs();
// default to ACTIVE
this.state =
State.getState(
@@ -180,7 +183,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
if (props != null) {
this.propMap.putAll(props);
}
- readPrs();
validate();
}
@@ -202,7 +204,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
this.node = String.valueOf(details.get("node_name"));
this.propMap.putAll(details);
- readPrs();
type =
Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ReplicaStateProps.TYPE, "NRT")));
if (state == null)
@@ -211,22 +212,6 @@ public class Replica extends ZkNodeProps implements MapWriter {
validate();
}
- private void readPrs() {
- ClusterState.getReplicaStatesProvider()
- .get()
- .ifPresent(
- it -> {
- log.debug("A replica {} state fetched from per-replica state", name);
- replicaState = it.getStates().get(name);
- if (replicaState != null) {
- propMap.put(
- ReplicaStateProps.STATE,
- replicaState.state.toString().toLowerCase(Locale.ROOT));
- if (replicaState.isLeader) propMap.put(ReplicaStateProps.LEADER, "true");
- }
- });
- }
-
private final void validate() {
Objects.requireNonNull(this.name, "'name' must not be null");
Objects.requireNonNull(this.core, "'core' must not be null");
@@ -303,6 +288,14 @@ public class Replica extends ZkNodeProps implements MapWriter {
/** Returns the {@link State} of this replica. */
public State getState() {
+ if (prsSupplier != null) {
+ PerReplicaStates.State s = prsSupplier.get().get(name);
+ if (s != null) {
+ return s.state;
+ } else {
+ return State.DOWN;
+ }
+ }
return state;
}
@@ -312,7 +305,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
}
public boolean isActive(Set<String> liveNodes) {
- return this.node != null && liveNodes.contains(this.node) && this.state == State.ACTIVE;
+ return this.node != null && liveNodes.contains(this.node) && getState() == State.ACTIVE;
}
public Type getType() {
@@ -320,6 +313,10 @@ public class Replica extends ZkNodeProps implements MapWriter {
}
public boolean isLeader() {
+ if (prsSupplier != null) {
+ PerReplicaStates.State st = prsSupplier.get().get(name);
+ return st == null ? false : st.isLeader;
+ }
return getBool(ReplicaStateProps.LEADER, false);
}
@@ -354,16 +351,18 @@ public class Replica extends ZkNodeProps implements MapWriter {
if (state.isLeader) props.put(ReplicaStateProps.LEADER, "true");
}
Replica r = new Replica(name, props, collection, shard);
- r.replicaState = state;
return r;
}
public PerReplicaStates.State getReplicaState() {
- return replicaState;
+ if (prsSupplier != null) {
+ return prsSupplier.get().get(name);
+ }
+ return null;
}
public Object clone() {
- return new Replica(name, node, collection, shard, core, state, type, propMap);
+ return new Replica(name, node, collection, shard, core, getState(), type, propMap);
}
@Override
@@ -401,7 +400,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
.put(ReplicaStateProps.COLLECTION, collection, p)
.put(ReplicaStateProps.NODE_NAME, node, p)
.put(ReplicaStateProps.TYPE, type.toString(), p)
- .put(ReplicaStateProps.STATE, state.toString(), p);
+ .put(ReplicaStateProps.STATE, shard, p);
};
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 5414942b480..76539e70ead 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -45,6 +45,18 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
public final String collection;
+ private DocCollection.PrsSupplier prsSupplier;
+
+ void setPrsSupplier(DocCollection.PrsSupplier prsSupplier) {
+ this.prsSupplier = prsSupplier;
+ for (Replica r : replicas.values()) {
+ r.setPrsSupplier(prsSupplier);
+ }
+ if (leader == null) {
+ leader = findLeader();
+ }
+ }
+
/**
* Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON.
*/
@@ -132,7 +144,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
// FUTURE: optional per-slice override of the collection replicationFactor
private final Integer replicationFactor;
private final Map<String, Replica> replicas;
- private final Replica leader;
+ private Replica leader;
private final State state;
private final String parent;
private final Map<String, RoutingRule> routingRules;
@@ -204,8 +216,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
} else {
this.routingRules = null;
}
-
- leader = findLeader();
}
@SuppressWarnings({"unchecked"})
@@ -275,7 +285,15 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
}
public Replica getLeader() {
- return leader;
+ if (prsSupplier != null) {
+ // this is a PRS collection. leader may keep changing
+ return findLeader();
+ } else {
+ if (leader == null) {
+ leader = findLeader();
+ }
+ return leader;
+ }
}
public int getNumLeaderReplicas() {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 5be6a44e76d..44053fbeb57 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -87,7 +87,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
.build()) {
livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
ClusterState cs =
- ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet());
+ ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet(), null);
refs.put(collName, new Ref(collName));
colls.put(collName, cs.getCollectionOrNull(collName));
responses.put(
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
index f5081ae0820..66b8cf121df 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java
@@ -39,7 +39,13 @@ import org.slf4j.LoggerFactory;
/** This test would be faster if we simulated the zk state instead. */
@LogLevel(
- "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+ "org.apache.solr.common.cloud.ZkStateReader=DEBUG;"
+ + "org.apache.solr.handler.admin.CollectionsHandler=DEBUG;"
+ + "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;"
+ + "org.apache.solr.cloud.Overseer=INFO;"
+ + "org.apache.solr.common.cloud=INFO;"
+ + "org.apache.solr.cloud.api.collections=INFO;"
+ + "org.apache.solr.cloud.overseer=INFO")
public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());