You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/25 03:09:26 UTC

[lucene-solr] branch reference_impl_dev updated: @1387 Short circuit to local leader for dist commit.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 21a6858  @1387 Short circuit to local leader for dist commit.
21a6858 is described below

commit 21a685844476d76ce6d12d8a4eac46cd945e6cd7
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Feb 24 20:58:56 2021 -0600

    @1387 Short circuit to local leader for dist commit.
    
    Took 23 minutes
---
 .../processor/DistributedUpdateProcessor.java      |   4 +-
 .../processor/DistributedZkUpdateProcessor.java    | 119 ++++++++++++---------
 .../org/apache/solr/core/TestJmxIntegration.java   |   6 +-
 3 files changed, 76 insertions(+), 53 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 928523f..ec0cb7a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -490,7 +490,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
             if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
               // This update is a repeat, or was reordered. We need to drop this update.
-              if (log.isDebugEnabled()) log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+              if (log.isDebugEnabled()) {
+                log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+              }
               return null;
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index b750ddc..3578058 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -201,6 +201,21 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         isLeader = leaderReplica.getName().equals(desc.getName());
 
         ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+
+        if (isLeader) {
+          SolrCmdDistributor.Node removeNode = null;
+          for (SolrCmdDistributor.Node node : nodes) {
+            if (node.getCoreName().equals(this.desc.getName())) {
+              removeNode = node;
+            }
+          }
+          if (removeNode != null) {
+            nodes.remove(removeNode);
+
+            sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+          }
+        }
+
         if (nodes == null) {
           // This could happen if there are only pull replicas
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -222,42 +237,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           }
         } else if (req.getParams().get(COMMIT_END_POINT, "").equals("leaders")) {
 
-          params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
-
-          params.set(COMMIT_END_POINT, "replicas");
-
-          List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
-
-          if (log.isDebugEnabled()) log.debug(
-              "processCommit - Found the following replicas to send commit to {}",
-              useNodes);
-
-          if (useNodes != null && useNodes.size() > 0) {
-            if (log.isDebugEnabled()) log.debug("processCommit - send commit to replicas nodes={}",
-                useNodes);
-
-            params.set(DISTRIB_FROM, Replica
-                .getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
-
-            List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
-
-            worker.collect("distCommit", () -> {
-              cmdDistrib.distribCommit(cmd, finalUseNodes, params);
-            });
-          }
-
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "processCommit - Do a local commit for leader");
-          }
-
-          worker.collect("localCommit", () -> {
-            try {
-              doLocalCommit(cmd);
-            } catch (IOException e) {
-              throw new SolrException(ErrorCode.SERVER_ERROR, e);
-            }
-          });
+          sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
         } else {
           // zk
 
@@ -288,6 +268,45 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
   }
 
+  private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, Replica leaderReplica, ModifiableSolrParams params) {
+    params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+
+    params.set(COMMIT_END_POINT, "replicas");
+
+    List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+
+    if (log.isDebugEnabled()) log.debug(
+        "processCommit - Found the following replicas to send commit to {}",
+        useNodes);
+
+    if (useNodes != null && useNodes.size() > 0) {
+      if (log.isDebugEnabled()) log.debug("processCommit - send commit to replicas nodes={}",
+          useNodes);
+
+      params.set(DISTRIB_FROM, Replica
+          .getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
+
+      List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
+
+      worker.collect("distCommit", () -> {
+        cmdDistrib.distribCommit(cmd, finalUseNodes, params);
+      });
+    }
+
+    if (log.isDebugEnabled()) {
+      log.debug(
+          "processCommit - Do a local commit for leader");
+    }
+
+    worker.collect("localCommit", () -> {
+      try {
+        doLocalCommit(cmd);
+      } catch (IOException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      }
+    });
+  }
+
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
     try {
@@ -688,23 +707,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     }
 
     clusterState = zkController.getClusterState();
-    DocCollection coll = clusterState.getCollection(collection);
-    Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
-
-    if (slice == null) {
-      // No slice found.  Most strict routers will have already thrown an exception, so a null return is
-      // a signal to use the slice of this core.
-      // TODO: what if this core is not in the targeted collection?
-      String shardId = cloudDesc.getShardId();
-      slice = coll.getSlice(shardId);
-      if (slice == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
-      }
-    }
 
     DistribPhase phase =
         DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
 
+    DocCollection coll = clusterState.getCollection(collection);
+    Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
+
     if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
 
       assert TestInjection.injectFailReplicaRequests();
@@ -715,6 +724,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
     }
 
+    if (slice == null) {
+      // No slice found.  Most strict routers will have already thrown an exception, so a null return is
+      // a signal to use the slice of this core.
+      // TODO: what if this core is not in the targeted collection?
+      String shardId = cloudDesc.getShardId();
+      slice = coll.getSlice(shardId);
+      if (slice == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
+      }
+    }
+
     String shardId = slice.getName();
     Replica leaderReplica;
     try {
@@ -726,7 +746,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       if (!isLeader) {
         isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
         if (isSubShardLeader) {
-          shardId = cloudDesc.getShardId();
           leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
         }
       }
diff --git a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
index 009838b..cee5f62 100644
--- a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
+++ b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
@@ -183,7 +183,7 @@ public class TestJmxIntegration extends SolrTestCaseJ4 {
 
       ObjectName name = nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
 
-      timeout = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+      timeout = new TimeOut(1000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
       Integer oldNumDocs = null;
       while (!timeout.hasTimedOut()) {
         nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
@@ -195,7 +195,9 @@ public class TestJmxIntegration extends SolrTestCaseJ4 {
         }
       }
 
-      assertNotNull("oldNumDocs should not be null", oldNumDocs);
+      if (oldNumDocs == null) {
+        oldNumDocs = 0;
+      }
 
       assertU(adoc("id", "1"));
       assertU("commit", commit());