You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/10/13 08:24:10 UTC
[solr] branch jira/solr-16640 updated: cleaned up ZkStateWriter update
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr-16640
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/jira/solr-16640 by this push:
new 25d935f26cc cleaned up ZkStateWriter update
25d935f26cc is described below
commit 25d935f26cc0a428d3ee68c95940b60aafe178cc
Author: Noble Paul <no...@gmail.com>
AuthorDate: Thu Oct 13 19:24:00 2022 +1100
cleaned up ZkStateWriter update
---
solr/core/src/java/org/apache/solr/cloud/Overseer.java | 13 ++-----------
.../org/apache/solr/cloud/RefreshCollectionMessage.java | 12 +++++++-----
.../java/org/apache/solr/cloud/overseer/ZkStateWriter.java | 11 ++++++++++-
3 files changed, 19 insertions(+), 17 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a844ce33366..0ef75a8ff9f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -356,17 +356,8 @@ public class Overseer implements SolrCloseable {
log.info("CHECKPOINT9 , CS : {}", clusterState.hashCode());
Message m = unprocessedMessages.remove(0);
log.info("a_Message({})", m);
- ClusterState clusterStateModified = m.run(clusterState, Overseer.this);
- if(clusterStateModified != clusterState) {
- zkStateWriter.clusterState = clusterStateModified;
- }
- clusterState = clusterStateModified;
+ clusterState = m.run(clusterState, Overseer.this, zkStateWriter);
log.info("CHECKPOINT0 , CS : {}", clusterState.hashCode());
- if (m instanceof RefreshCollectionMessage) {
- RefreshCollectionMessage refreshCollectionMessage = (RefreshCollectionMessage) m;
- log.info("coll :{}, found : {}",refreshCollectionMessage.collection,
- clusterState.getCollectionOrNull(refreshCollectionMessage.collection) );
- }
}
log.info("CHECKPOINT5: Going to process queue item, hash:{}, " ,clusterState.hashCode());
@@ -1225,7 +1216,7 @@ public class Overseer implements SolrCloseable {
}
public interface Message {
- ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+ ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw) throws Exception;
}
/**
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index d58fbaf0480..a5930bc7e2c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -17,13 +17,16 @@
package org.apache.solr.cloud;
+import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.function.Function;
/** Refresh the Cluster State for a given collection */
public class RefreshCollectionMessage implements Overseer.Message {
@@ -35,7 +38,7 @@ public class RefreshCollectionMessage implements Overseer.Message {
this.collection = collection;
}
- public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+ public ClusterState run(ClusterState clusterState, Overseer overseer, ZkStateWriter zksw) throws Exception {
log.info("RefreshCollectionMessage({})", collection);
Stat stat =
overseer
@@ -54,11 +57,10 @@ public class RefreshCollectionMessage implements Overseer.Message {
} else {
log.info("RefreshCollectionMessage({}). stale, refreshed to ver {}", collection, stat.getVersion());
overseer.getZkStateReader().forceUpdateCollection(collection);
- coll = overseer.getZkStateReader().getCollectionLive(collection);
+ coll = overseer.getZkStateReader().getCollection(collection);
+ zksw.updateClusterState(it -> it.copyWith(collection, overseer.getZkStateReader().getCollection(collection)));
log.info("getCollectionLive coll {}", coll);
- ClusterState copiedCS = clusterState.copyWith(collection, coll);
- log.info("copied CS {}", copiedCS);
- return copiedCS;
+ return clusterState.copyWith(collection, coll);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 3752530c809..4f0adb16ae9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
@@ -67,7 +69,7 @@ public class ZkStateWriter {
protected Map<String, ZkWriteCommand> updates = new HashMap<>();
private int numUpdates = 0;
- public ClusterState clusterState = null;
+ protected ClusterState clusterState = null;
protected long lastUpdatedTime = 0;
/**
@@ -84,6 +86,13 @@ public class ZkStateWriter {
this.clusterState = zkStateReader.getClusterState();
}
+ /**
+ * if any collection is updated not through this class (directly written to ZK, then it needs to be updated locally)
+ */
+ public void updateClusterState(Function<ClusterState, ClusterState> fun) {
+ clusterState = fun.apply(clusterState);
+ }
+
/**
* Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified {@link
* ClusterState} is returned and it is expected that the caller will use the returned cluster