You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by is...@apache.org on 2022/10/14 06:19:15 UTC

[solr] branch branch_9x updated: SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 6cfdc5e2db0 SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter
6cfdc5e2db0 is described below

commit 6cfdc5e2db0cb593252ea4a68e248777ba8e4ff5
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Fri Oct 14 11:48:15 2022 +0530

    SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter
---
 solr/CHANGES.txt                                   |  2 ++
 .../src/java/org/apache/solr/cloud/Overseer.java   |  5 +++--
 .../solr/cloud/RefreshCollectionMessage.java       | 21 +++++++++++++++---
 .../api/collections/CollectionCommandContext.java  |  3 +--
 .../cloud/api/collections/CreateCollectionCmd.java | 25 +++-------------------
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  9 ++++++++
 6 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7ede5f17448..9de208ddca6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -189,6 +189,8 @@ Bug Fixes
 
 * SOLR-16460: ClusterState.copyWith is inconsistent (noble)
 
+* SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter (noble, Ishan Chattopadhyaya)
+
 Other Changes
 ---------------------
 * SOLR-16351: Upgrade Carrot2 to 4.4.3, upgrade randomizedtesting to 2.8.0. (Dawid Weiss)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index aa1dd2253f7..d0c615ab2db 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -350,7 +350,7 @@ public class Overseer implements SolrCloseable {
                 while (unprocessedMessages.size() > 0) {
                   clusterState = zkStateWriter.writePendingUpdates();
                   Message m = unprocessedMessages.remove(0);
-                  clusterState = m.run(clusterState, Overseer.this);
+                  clusterState = m.run(clusterState, Overseer.this, zkStateWriter);
                 }
                 // The callback always be called on this thread
                 clusterState =
@@ -1201,7 +1201,8 @@ public class Overseer implements SolrCloseable {
   }
 
   public interface Message {
-    ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+    ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw)
+        throws Exception;
   }
 
   /**
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index f9fca3cbddc..63628ab0f2c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -17,11 +17,15 @@
 
 package org.apache.solr.cloud;
 
+import org.apache.solr.cloud.overseer.ZkStateWriter;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.zookeeper.data.Stat;
 
-/** Refresh the Cluster State for a given collection */
+/**
+ * Refresh the ClusterState for a given collection. Useful for situations where updates to the
+ * cluster state happened outside of the overseer.
+ */
 public class RefreshCollectionMessage implements Overseer.Message {
   public final String collection;
 
@@ -29,7 +33,8 @@ public class RefreshCollectionMessage implements Overseer.Message {
     this.collection = collection;
   }
 
-  public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+  public ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zkStateWriter)
+      throws Exception {
     Stat stat =
         overseer
             .getZkStateReader()
@@ -44,7 +49,17 @@ public class RefreshCollectionMessage implements Overseer.Message {
       // our state is up to date
       return clusterState;
     } else {
-      coll = overseer.getZkStateReader().getCollectionLive(collection);
+      overseer.getZkStateReader().forceUpdateCollection(collection);
+      coll = overseer.getZkStateReader().getCollection(collection);
+
+      // During collection creation for a PRS collection, the cluster state (state.json) for the
+      // collection is written to ZK directly by the node (that received the CREATE request).
+      // Hence, we need the overseer's ZkStateWriter and the overseer's internal copy of the cluster
+      // state
+      // to be updated to contain that collection via this refresh.
+
+      zkStateWriter.updateClusterState(
+          it -> it.copyWith(collection, overseer.getZkStateReader().getCollection(collection)));
       return clusterState.copyWith(collection, coll);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
index 7bf58ee1c55..8f0ce6aded1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
@@ -99,7 +99,6 @@ public interface CollectionCommandContext {
    * updates are distributed.
    */
   default void submitIntraProcessMessage(Overseer.Message message) {
-    throw new IllegalStateException(
-        "Bug! submitIntraProcessMessage() should not be called when distributed state updates are enabled");
+    // this is ignored
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index f18713ea427..8e15d25ea2b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -195,15 +195,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
             .create(collectionPath, data, CreateMode.PERSISTENT, true);
         clusterState = clusterState.copyWith(collectionName, command.collection);
         newColl = command.collection;
-        // When cluster state updates are handled by Overseer, ask it to load that collection it
-        // doesn't know about. When cluster state updates are distributed, ZK is the source of truth
-        // for all nodes so no reload needed.
-        if (!ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
-          // If cluster state update is not distributed and we execute here, the Collection API is
-          // not distributed either and this execution happens on the Overseer node, so direct
-          // memory access as done below is ok.
-          ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
-        }
+        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
       } else {
         if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
           // The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and
@@ -383,10 +375,6 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
       // PRS collections updated ZK state.json in the loop above. When Overseer is managing cluster
       // state updates, need to tell it to refresh itself to know about the replicas and be able to
       // execute nodes shard requests regarding the replicas.
-      if (isPRS && !ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
-        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
-      }
-
       // Distributed updates don't need to do anything for PRS collections that wrote state.json
       // directly. For non PRS collections, distributed updates have to be executed if that's how
       // the cluster is configured
@@ -403,6 +391,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
             .flatMap(slice -> slice.getReplicas().stream())
             .filter(r -> coresToCreate.containsKey(r.getCoreName()))
             .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
+        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
       } else {
         // wait for all replica entries to be created and visible in local cluster state (updated by
         // ZK watches)
@@ -446,15 +435,6 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
         } else {
           failure = true;
         }
-        // When cluster state updates are distributed, Overseer state updater is not used and
-        // doesn't have to be notified of a new collection created elsewhere (which is how all
-        // collections are created). Note it is likely possibly to skip the the whole if (isPRS)
-        // bloc, but keeping distributed state updates as close in behavior to Overseer state
-        // updates for now.
-        if (!ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
-          // Now ask Overseer to fetch the latest state of collection from ZK
-          ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
-        }
       }
       if (failure) {
         // Let's cleanup as we hit an exception
@@ -466,6 +446,7 @@ public class CreateCollectionCmd implements CollApiCmds.CollectionApiCommand {
             ErrorCode.BAD_REQUEST,
             "Underlying core creation failed while creating collection: " + collectionName);
       } else {
+        ccc.submitIntraProcessMessage(new RefreshCollectionMessage(collectionName));
         log.debug("Finished create command on all shards for collection: {}", collectionName);
         // Emit a warning about production use of data driven functionality
         // Note: isAutoGeneratedConfigSet is always a clone of the _default configset
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 2908382e1c1..44d3be57c4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
@@ -84,6 +85,14 @@ public class ZkStateWriter {
     this.clusterState = zkStateReader.getClusterState();
   }
 
+  /**
+   * if any collection is updated not through this class (directly written to ZK, then it needs to
+   * be updated locally)
+   */
+  public void updateClusterState(Function<ClusterState, ClusterState> fun) {
+    clusterState = fun.apply(clusterState);
+  }
+
   /**
    * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified {@link
    * ClusterState} is returned and it is expected that the caller will use the returned cluster