You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/10/13 08:24:10 UTC

[solr] branch jira/solr-16640 updated: cleaned up ZkStateWriter update

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr-16640
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/jira/solr-16640 by this push:
     new 25d935f26cc cleaned up ZkStateWriter update
25d935f26cc is described below

commit 25d935f26cc0a428d3ee68c95940b60aafe178cc
Author: Noble Paul <no...@gmail.com>
AuthorDate: Thu Oct 13 19:24:00 2022 +1100

    cleaned up ZkStateWriter update
---
 solr/core/src/java/org/apache/solr/cloud/Overseer.java      | 13 ++-----------
 .../org/apache/solr/cloud/RefreshCollectionMessage.java     | 12 +++++++-----
 .../java/org/apache/solr/cloud/overseer/ZkStateWriter.java  | 11 ++++++++++-
 3 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a844ce33366..0ef75a8ff9f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -356,17 +356,8 @@ public class Overseer implements SolrCloseable {
                   log.info("CHECKPOINT9 , CS : {}", clusterState.hashCode());
                   Message m = unprocessedMessages.remove(0);
                   log.info("a_Message({})", m);
-                  ClusterState clusterStateModified = m.run(clusterState, Overseer.this);
-                  if(clusterStateModified != clusterState) {
-                    zkStateWriter.clusterState = clusterStateModified;
-                  }
-                  clusterState = clusterStateModified;
+                  clusterState = m.run(clusterState, Overseer.this, zkStateWriter);
                   log.info("CHECKPOINT0 , CS : {}", clusterState.hashCode());
-                  if (m instanceof RefreshCollectionMessage) {
-                    RefreshCollectionMessage refreshCollectionMessage = (RefreshCollectionMessage) m;
-                    log.info("coll :{}, found : {}",refreshCollectionMessage.collection,
-                            clusterState.getCollectionOrNull(refreshCollectionMessage.collection) );
-                  }
                 }
 
                 log.info("CHECKPOINT5: Going to process queue item, hash:{}, " ,clusterState.hashCode());
@@ -1225,7 +1216,7 @@ public class Overseer implements SolrCloseable {
   }
 
   public interface Message {
-    ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+    ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw) throws Exception;
   }
 
   /**
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index d58fbaf0480..a5930bc7e2c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -17,13 +17,16 @@
 
 package org.apache.solr.cloud;
 
+import org.apache.solr.cloud.overseer.ZkStateWriter;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
+import java.util.function.Function;
 
 /** Refresh the Cluster State for a given collection */
 public class RefreshCollectionMessage implements Overseer.Message {
@@ -35,7 +38,7 @@ public class RefreshCollectionMessage implements Overseer.Message {
     this.collection = collection;
   }
 
-  public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+  public ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw) throws Exception {
     log.info("RefreshCollectionMessage({})", collection);
     Stat stat =
         overseer
@@ -54,11 +57,10 @@ public class RefreshCollectionMessage implements Overseer.Message {
     } else {
       log.info("RefreshCollectionMessage({}). stale, refreshed to ver {}", collection, stat.getVersion());
       overseer.getZkStateReader().forceUpdateCollection(collection);
-      coll = overseer.getZkStateReader().getCollectionLive(collection);
+      coll = overseer.getZkStateReader().getCollection(collection);
+      zksw.updateClusterState(it -> it.copyWith(collection, overseer.getZkStateReader().getCollection(collection)));
       log.info("getCollectionLive coll {}",  coll);
-      ClusterState copiedCS = clusterState.copyWith(collection, coll);
-      log.info("copied CS {}",  copiedCS);
-      return copiedCS;
+      return clusterState.copyWith(collection, coll);
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 3752530c809..4f0adb16ae9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
@@ -67,7 +69,7 @@ public class ZkStateWriter {
 
   protected Map<String, ZkWriteCommand> updates = new HashMap<>();
   private int numUpdates = 0;
-  public ClusterState clusterState = null;
+  protected ClusterState clusterState = null;
   protected long lastUpdatedTime = 0;
 
   /**
@@ -84,6 +86,13 @@ public class ZkStateWriter {
     this.clusterState = zkStateReader.getClusterState();
   }
 
+  /**
+   * if any collection is updated not through this class (directly written to ZK, then it needs to be updated locally)
+   */
+  public void updateClusterState(Function<ClusterState, ClusterState> fun) {
+    clusterState = fun.apply(clusterState);
+  }
+
   /**
    * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified {@link
    * ClusterState} is returned and it is expected that the caller will use the returned cluster