You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 21:14:47 UTC

[lucene-solr] branch reference_impl_dev updated: @1167 Some state publish improvements.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 3b02c91  @1167 Some state publish improvements.
3b02c91 is described below

commit 3b02c91436bb1bdedd6c3252c8e7d68c1e25a71b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Nov 11 15:14:26 2020 -0600

    @1167 Some state publish improvements.
---
 .../java/org/apache/solr/cloud/StatePublisher.java | 36 ++++++++--
 .../java/org/apache/solr/cloud/ZkController.java   | 23 +++++--
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 80 ++++++++--------------
 3 files changed, 76 insertions(+), 63 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 433db0c..9574f8d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -34,6 +36,8 @@ public class StatePublisher implements Closeable {
   private static final Logger log = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
+  private final Map<String,String> stateCache = new ConcurrentHashMap<>(32, 0.75f, 4);
+
   public static class NoOpMessage extends ZkNodeProps {
   }
 
@@ -90,11 +94,15 @@ public class StatePublisher implements Closeable {
     }
 
     private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
-      String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
-      String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
-      String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
-
-      bulkMessage.getProperties().put(core, collection + "," + state);
+      if (zkNodeProps.getStr("operation").equals("DOWNNODE")) {
+        bulkMessage.getProperties().put("DOWNNODE", zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP));
+      } else {
+        String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+        String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+        String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
+
+        bulkMessage.getProperties().put(core, collection + "," + state);
+      }
     }
 
     private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
@@ -107,9 +115,27 @@ public class StatePublisher implements Closeable {
   }
 
   public void submitState(ZkNodeProps stateMessage) {
+    // Don't allow publish of state we last published if not DOWNNODE
+    if (stateMessage != TERMINATE_OP) {
+      String operation = stateMessage.getStr("operation");
+      if (operation.equals("state")) {
+        String core = stateMessage.getStr(ZkStateReader.CORE_NAME_PROP);
+        String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
+        String lastState = stateCache.get(core);
+        if (state.equals(lastState)) {
+          return;
+        }
+        stateCache.put(core, state);
+      }
+    }
+
     workQueue.offer(stateMessage);
   }
 
+  public void clearStatCache(String core) {
+    stateCache.remove(core);
+  }
+
   public void start() {
     this.worker = new Worker();
     workerFuture = ParWork.getRootSharedExecutor().submit(this.worker);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 4d55be8..0a77916 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -680,11 +680,6 @@ public class ZkController implements Closeable, Runnable {
 
     this.isClosed = true;
 
-    if (statePublisher != null) {
-      statePublisher.submitState(StatePublisher.TERMINATE_OP);
-    }
-    IOUtils.closeQuietly(statePublisher);
-
     try (ParWork closer = new ParWork(this, true, true)) {
       closer.collect(overseer);
       closer.collect(replicateFromLeaders);
@@ -696,6 +691,15 @@ public class ZkController implements Closeable, Runnable {
       closer.collect(overseerContexts);
     }
 
+    try {
+      if (statePublisher != null) {
+        statePublisher.submitState(StatePublisher.TERMINATE_OP);
+      }
+      IOUtils.closeQuietly(statePublisher);
+    } catch (Exception e) {
+      log.error("Exception closing state publisher");
+    }
+
     IOUtils.closeQuietly(zkStateReader);
 
     if (closeZkClient) {
@@ -1826,6 +1830,11 @@ public class ZkController implements Closeable, Runnable {
 
   public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFromZk) throws Exception {
     log.info("Unregister core from zookeeper {}", coreName);
+
+    if (statePublisher != null) {
+      statePublisher.clearStatCache(coreName);
+    }
+
     if (!zkClient.isConnected()) return;
     final String collection = cd.getCloudDescriptor().getCollectionName();
 
@@ -2531,8 +2540,8 @@ public class ZkController implements Closeable, Runnable {
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
         ZkStateReader.NODE_NAME_PROP, nodeName);
     try {
-      overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
-    } catch (AlreadyClosedException | InterruptedException e) {
+      statePublisher.submitState(m);
+    } catch (AlreadyClosedException e) {
       ParWork.propagateInterrupt("Not publishing node as DOWN because a resource required to do so is already closed.", null, true);
       return;
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 01535ba..13bb6cb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -154,10 +154,27 @@ public class ZkStateWriter {
         }
         switch (overseerAction) {
           case STATE:
-           // log.info("state cmd {}", message);
+            // log.info("state cmd {}", message);
             message.getProperties().remove("operation");
 
             for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+              if (entry.getKey().equals("DOWNNODE")) {
+                cs.forEachCollection(docColl -> {
+
+                  List<Replica> replicas = docColl.getReplicas();
+                  for (Replica replica : replicas) {
+                    if (replica.getState() != Replica.State.DOWN) {
+                      replica.setState(Replica.State.DOWN);
+                      changed.set(true);
+                      collectionsToWrite.add(docColl.getName());
+                    }
+                  }
+
+                });
+
+                continue;
+              }
+
               String core = entry.getKey();
               String collectionAndStateString = (String) entry.getValue();
               String[] collectionAndState = collectionAndStateString.split(",");
@@ -195,61 +212,22 @@ public class ZkStateWriter {
             }
 
             break;
-          case LEADER:
-           // log.info("leader cmd");
-            String collection = message.getStr("collection");
-            DocCollection docColl = cs.getCollectionOrNull(collection);
-            if (docColl != null) {
-              Slice slice = docColl.getSlice(message.getStr("shard"));
-              if (slice != null) {
-                Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
-                if (replica != null) {
-                  log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
-                  slice.setLeader(replica);
-                  replica.setState(Replica.State.ACTIVE);
-                  replica.getProperties().put("leader", "true");
-                  Collection<Replica> replicas = slice.getReplicas();
-                  for (Replica r : replicas) {
-                    if (r != replica) {
-                      r.getProperties().remove("leader");
-                    }
-                  }
-                  changed.set(true);
-                  collectionsToWrite.add(collection);
-                }
-              }
-            }
-            break;
-//          case ADDROUTINGRULE:
-//            return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
-//          case REMOVEROUTINGRULE:
-//            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
+          //          case ADDROUTINGRULE:
+          //            return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
+          //          case REMOVEROUTINGRULE:
+          //            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
           case UPDATESHARDSTATE:
-            collection = message.getStr("collection");
+            String collection = message.getStr("collection");
             message.getProperties().remove("collection");
             message.getProperties().remove("operation");
 
-              docColl = cs.getCollectionOrNull(collection);
-              if (docColl != null) {
-                for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
-                  Slice slice = docColl.getSlice(entry.getKey());
-                  if (slice != null) {
-                    Slice.State state = Slice.State.getState((String) entry.getValue());
-                    slice.setState(state);
-                    changed.set(true);
-                    collectionsToWrite.add(collection);
-                  }
-                }
-              }
-            break;
-          case DOWNNODE:
-            collection = message.getStr("collection");
-            docColl = cs.getCollectionOrNull(collection);
+            DocCollection docColl = cs.getCollectionOrNull(collection);
             if (docColl != null) {
-              List<Replica> replicas = docColl.getReplicas();
-              for (Replica replica : replicas) {
-                if (replica.getState() != Replica.State.DOWN) {
-                  replica.setState(Replica.State.DOWN);
+              for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+                Slice slice = docColl.getSlice(entry.getKey());
+                if (slice != null) {
+                  Slice.State state = Slice.State.getState((String) entry.getValue());
+                  slice.setState(state);
                   changed.set(true);
                   collectionsToWrite.add(collection);
                 }