You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 21:14:47 UTC
[lucene-solr] branch reference_impl_dev updated: @1167 Some state
publish improvements.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 3b02c91 @1167 Some state publish improvements.
3b02c91 is described below
commit 3b02c91436bb1bdedd6c3252c8e7d68c1e25a71b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Nov 11 15:14:26 2020 -0600
@1167 Some state publish improvements.
---
.../java/org/apache/solr/cloud/StatePublisher.java | 36 ++++++++--
.../java/org/apache/solr/cloud/ZkController.java | 23 +++++--
.../apache/solr/cloud/overseer/ZkStateWriter.java | 80 ++++++++--------------
3 files changed, 76 insertions(+), 63 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 433db0c..9574f8d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -34,6 +36,8 @@ public class StatePublisher implements Closeable {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
+ private final Map<String,String> stateCache = new ConcurrentHashMap<>(32, 0.75f, 4);
+
public static class NoOpMessage extends ZkNodeProps {
}
@@ -90,11 +94,15 @@ public class StatePublisher implements Closeable {
}
private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
- String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
- String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
- String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
-
- bulkMessage.getProperties().put(core, collection + "," + state);
+ if (zkNodeProps.getStr("operation").equals("DOWNNODE")) {
+ bulkMessage.getProperties().put("DOWNNODE", zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP));
+ } else {
+ String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+ String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+ String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
+
+ bulkMessage.getProperties().put(core, collection + "," + state);
+ }
}
private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
@@ -107,9 +115,27 @@ public class StatePublisher implements Closeable {
}
public void submitState(ZkNodeProps stateMessage) {
+ // Don't allow publish of state we last published if not DOWNNODE
+ if (stateMessage != TERMINATE_OP) {
+ String operation = stateMessage.getStr("operation");
+ if (operation.equals("state")) {
+ String core = stateMessage.getStr(ZkStateReader.CORE_NAME_PROP);
+ String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
+ String lastState = stateCache.get(core);
+ if (state.equals(lastState)) {
+ return;
+ }
+ stateCache.put(core, state);
+ }
+ }
+
workQueue.offer(stateMessage);
}
+ public void clearStatCache(String core) {
+ stateCache.remove(core);
+ }
+
public void start() {
this.worker = new Worker();
workerFuture = ParWork.getRootSharedExecutor().submit(this.worker);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 4d55be8..0a77916 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -680,11 +680,6 @@ public class ZkController implements Closeable, Runnable {
this.isClosed = true;
- if (statePublisher != null) {
- statePublisher.submitState(StatePublisher.TERMINATE_OP);
- }
- IOUtils.closeQuietly(statePublisher);
-
try (ParWork closer = new ParWork(this, true, true)) {
closer.collect(overseer);
closer.collect(replicateFromLeaders);
@@ -696,6 +691,15 @@ public class ZkController implements Closeable, Runnable {
closer.collect(overseerContexts);
}
+ try {
+ if (statePublisher != null) {
+ statePublisher.submitState(StatePublisher.TERMINATE_OP);
+ }
+ IOUtils.closeQuietly(statePublisher);
+ } catch (Exception e) {
+ log.error("Exception closing state publisher");
+ }
+
IOUtils.closeQuietly(zkStateReader);
if (closeZkClient) {
@@ -1826,6 +1830,11 @@ public class ZkController implements Closeable, Runnable {
public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFromZk) throws Exception {
log.info("Unregister core from zookeeper {}", coreName);
+
+ if (statePublisher != null) {
+ statePublisher.clearStatCache(coreName);
+ }
+
if (!zkClient.isConnected()) return;
final String collection = cd.getCloudDescriptor().getCollectionName();
@@ -2531,8 +2540,8 @@ public class ZkController implements Closeable, Runnable {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName);
try {
- overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
- } catch (AlreadyClosedException | InterruptedException e) {
+ statePublisher.submitState(m);
+ } catch (AlreadyClosedException e) {
ParWork.propagateInterrupt("Not publishing node as DOWN because a resource required to do so is already closed.", null, true);
return;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 01535ba..13bb6cb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -154,10 +154,27 @@ public class ZkStateWriter {
}
switch (overseerAction) {
case STATE:
- // log.info("state cmd {}", message);
+ // log.info("state cmd {}", message);
message.getProperties().remove("operation");
for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+ if (entry.getKey().equals("DOWNNODE")) {
+ cs.forEachCollection(docColl -> {
+
+ List<Replica> replicas = docColl.getReplicas();
+ for (Replica replica : replicas) {
+ if (replica.getState() != Replica.State.DOWN) {
+ replica.setState(Replica.State.DOWN);
+ changed.set(true);
+ collectionsToWrite.add(docColl.getName());
+ }
+ }
+
+ });
+
+ continue;
+ }
+
String core = entry.getKey();
String collectionAndStateString = (String) entry.getValue();
String[] collectionAndState = collectionAndStateString.split(",");
@@ -195,61 +212,22 @@ public class ZkStateWriter {
}
break;
- case LEADER:
- // log.info("leader cmd");
- String collection = message.getStr("collection");
- DocCollection docColl = cs.getCollectionOrNull(collection);
- if (docColl != null) {
- Slice slice = docColl.getSlice(message.getStr("shard"));
- if (slice != null) {
- Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
- if (replica != null) {
- log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
- slice.setLeader(replica);
- replica.setState(Replica.State.ACTIVE);
- replica.getProperties().put("leader", "true");
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica r : replicas) {
- if (r != replica) {
- r.getProperties().remove("leader");
- }
- }
- changed.set(true);
- collectionsToWrite.add(collection);
- }
- }
- }
- break;
-// case ADDROUTINGRULE:
-// return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
-// case REMOVEROUTINGRULE:
-// return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
+ // case ADDROUTINGRULE:
+ // return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
+ // case REMOVEROUTINGRULE:
+ // return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
case UPDATESHARDSTATE:
- collection = message.getStr("collection");
+ String collection = message.getStr("collection");
message.getProperties().remove("collection");
message.getProperties().remove("operation");
- docColl = cs.getCollectionOrNull(collection);
- if (docColl != null) {
- for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
- Slice slice = docColl.getSlice(entry.getKey());
- if (slice != null) {
- Slice.State state = Slice.State.getState((String) entry.getValue());
- slice.setState(state);
- changed.set(true);
- collectionsToWrite.add(collection);
- }
- }
- }
- break;
- case DOWNNODE:
- collection = message.getStr("collection");
- docColl = cs.getCollectionOrNull(collection);
+ DocCollection docColl = cs.getCollectionOrNull(collection);
if (docColl != null) {
- List<Replica> replicas = docColl.getReplicas();
- for (Replica replica : replicas) {
- if (replica.getState() != Replica.State.DOWN) {
- replica.setState(Replica.State.DOWN);
+ for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+ Slice slice = docColl.getSlice(entry.getKey());
+ if (slice != null) {
+ Slice.State state = Slice.State.getState((String) entry.getValue());
+ slice.setState(state);
changed.set(true);
collectionsToWrite.add(collection);
}