You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by is...@apache.org on 2022/10/14 06:18:30 UTC
[solr] branch main updated: SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 8c7c1eb4839 SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter
8c7c1eb4839 is described below
commit 8c7c1eb483943398a3a3b63cc64d1ab7645f37c5
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Fri Oct 14 11:48:15 2022 +0530
SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter
---
solr/CHANGES.txt | 2 ++
.../src/java/org/apache/solr/cloud/Overseer.java | 5 +++--
.../solr/cloud/RefreshCollectionMessage.java | 21 +++++++++++++++---
.../api/collections/CollectionCommandContext.java | 3 +--
.../cloud/api/collections/CreateCollectionCmd.java | 25 +++-------------------
.../apache/solr/cloud/overseer/ZkStateWriter.java | 9 ++++++++
6 files changed, 36 insertions(+), 29 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b5dd3f15d27..bebd1c73ffc 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -219,6 +219,8 @@ Bug Fixes
* SOLR-16460: ClusterState.copyWith is inconsistent (noble)
+* SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter (noble, Ishan Chattopadhyaya)
+
Other Changes
---------------------
* SOLR-16351: Upgrade Carrot2 to 4.4.3, upgrade randomizedtesting to 2.8.0. (Dawid Weiss)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index aa1dd2253f7..d0c615ab2db 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -350,7 +350,7 @@ public class Overseer implements SolrCloseable {
while (unprocessedMessages.size() > 0) {
clusterState = zkStateWriter.writePendingUpdates();
Message m = unprocessedMessages.remove(0);
- clusterState = m.run(clusterState, Overseer.this);
+ clusterState = m.run(clusterState, Overseer.this, zkStateWriter);
}
// The callback always be called on this thread
clusterState =
@@ -1201,7 +1201,8 @@ public class Overseer implements SolrCloseable {
}
public interface Message {
- ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+ ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw)
+ throws Exception;
}
/**
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index f9fca3cbddc..63628ab0f2c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -17,11 +17,15 @@
package org.apache.solr.cloud;
+import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.zookeeper.data.Stat;
-/** Refresh the Cluster State for a given collection */
+/**
+ * Refresh the ClusterState for a given collection. Useful for situations where updates to the
+ * cluster state happened outside of the overseer.
+ */
public class RefreshCollectionMessage implements Overseer.Message {
public final String collection;
@@ -29,7 +33,8 @@ public class RefreshCollectionMessage implements Overseer.Message {
this.collection = collection;
}
- public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+ public ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zkStateWriter)
+ throws Exception {
Stat stat =
overseer
.getZkStateReader()
@@ -44,7 +49,17 @@ public class RefreshCollectionMessage implements Overseer.Message {
// our state is up to date
return clusterState;
} else {
- coll = overseer.getZkStateReader().getCollectionLive(collection);
+ overseer.getZkStateReader().forceUpdateCollection(collection);
+ coll = overseer.getZkStateReader().getCollection(collection);
+
+ // During collection creation for a PRS collection, the cluster state (state.json) for the
+ // collection is written to ZK directly by the node (that received the CREATE request).
+ // Hence, we need the overseer's ZkStateWriter and the overseer's internal copy of the cluster
+ // state
+ // to be updated to contain that collection via this refresh.
+
+ zkStateWriter.updateClusterState(
+ it -> it.copyWith(collection, overseer.getZkStateReader().getCollection(collection)));
return clusterState.copyWith(collection, coll);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
index 7bf58ee1c55..8f0ce6aded1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
@@ -99,7 +99,6 @@ public interface CollectionCommandContext {
* updates are distributed.
*/
default void submitIntraProcessMessage(Overseer.Message message) {
- throw new IllegalStateException(
- "Bug! submitIntraProcessMessage() should not be called when distributed state updates are enabled");
+ // this is ignored
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index f18713ea427..8e15d25ea2b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -195,15 +195,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
.create(collectionPath, data, CreateMode.PERSISTENT, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
- // When cluster state updates are handled by Overseer, ask it to load that collection it
- // doesn't know about. When cluster state updates are distributed, ZK is the source of truth
- // for all nodes so no reload needed.
- if (!ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
- // If cluster state update is not distributed and we execute here, the Collection API is
- // not distributed either and this execution happens on the Overseer node, so direct
- // memory access as done below is ok.
- ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
- }
+ ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
} else {
if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
// The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and
@@ -383,10 +375,6 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
// PRS collections updated ZK state.json in the loop above. When Overseer is managing cluster
// state updates, need to tell it to refresh itself to know about the replicas and be able to
// execute nodes shard requests regarding the replicas.
- if (isPRS && !ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
- ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
- }
-
// Distributed updates don't need to do anything for PRS collections that wrote state.json
// directly. For non PRS collections, distributed updates have to be executed if that's how
// the cluster is configured
@@ -403,6 +391,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
.flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coresToCreate.containsKey(r.getCoreName()))
.forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
+ ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
} else {
// wait for all replica entries to be created and visible in local cluster state (updated by
// ZK watches)
@@ -446,15 +435,6 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
} else {
failure = true;
}
- // When cluster state updates are distributed, Overseer state updater is not used and
- // doesn't have to be notified of a new collection created elsewhere (which is how all
- // collections are created). Note it is likely possibly to skip the the whole if (isPRS)
- // bloc, but keeping distributed state updates as close in behavior to Overseer state
- // updates for now.
- if (!ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
- // Now ask Overseer to fetch the latest state of collection from ZK
- ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
- }
}
if (failure) {
// Let's cleanup as we hit an exception
@@ -466,6 +446,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
ErrorCode.BAD_REQUEST,
"Underlying core creation failed while creating collection: " + collectionName);
} else {
+ ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
log.debug("Finished create command on all shards for collection: {}", collectionName);
// Emit a warning about production use of data driven functionality
// Note: isAutoGeneratedConfigSet is always a clone of the _default configset
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 2908382e1c1..44d3be57c4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
@@ -84,6 +85,14 @@ public class ZkStateWriter {
this.clusterState = zkStateReader.getClusterState();
}
+ /**
+ * if any collection is updated not through this class (directly written to ZK, then it needs to
+ * be updated locally)
+ */
+ public void updateClusterState(Function<ClusterState, ClusterState> fun) {
+ clusterState = fun.apply(clusterState);
+ }
+
/**
* Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified {@link
* ClusterState} is returned and it is expected that the caller will use the returned cluster