You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/27 03:08:32 UTC

[lucene-solr] branch reference_impl_dev updated: @1413 Dist commit leader bs.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 9fd027c  @1413 Dist commit leader bs.
9fd027c is described below

commit 9fd027ca6bfe3e3b3f78924e9af2b23531d0a7e0
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Feb 26 21:07:26 2021 -0600

    @1413 Dist commit leader bs.
    
    Took 27 minutes
---
 .../apache/solr/update/DeleteUpdateCommand.java    |  5 +++
 .../processor/DistributedZkUpdateProcessor.java    | 37 +++++++++++-----------
 .../solr/common/cloud/ConnectionManager.java       |  9 +++---
 3 files changed, 29 insertions(+), 22 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
index ac56ab3..3827ab5 100644
--- a/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
@@ -51,6 +51,11 @@ public class DeleteUpdateCommand extends UpdateCommand {
     query = null;
     indexedId = null;
     version = 0;
+    commitWithin = -1;
+
+    flags = 0;
+    route = null;
+    indexedId = null;
   }
 
   /** Returns the indexed ID for this delete.  The returned BytesRef is retained across multiple calls, and should not be modified. */
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 4f59f8e..e471b22 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -160,6 +160,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    Replica leaderReplica;
+    try {
+      leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId());
+    } catch (Exception e) {
+      ParWork.propagateInterrupt(e);
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+
+    }
+
+    isLeader = leaderReplica.getName().equals(desc.getName());
+    
     if (log.isDebugEnabled()) log.debug("processCommit - start commit isLeader={} commit_end_point={} replicaType={}", isLeader, req.getParams().get(COMMIT_END_POINT), replicaType);
 
       try (ParWork worker = new ParWork(this, false, false)) {
@@ -175,7 +186,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         updateCommand = cmd;
 
         List<SolrCmdDistributor.Node> nodes = null;
-        Replica leaderReplica = null;
+
         zkCheck();
 
         if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal") || (req.getParams().getBool("dist") != null && !req.getParams().getBool("dist"))) {
@@ -185,16 +196,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           return;
         }
 
-        try {
-          leaderReplica = clusterState.getCollection(collection).getSlice(cloudDesc.getShardId()).getLeader();
-        } catch (Exception e) {
-          ParWork.propagateInterrupt(e);
-          throw new SolrException(ErrorCode.SERVER_ERROR,
-              "Exception finding leader for shard " + cloudDesc.getShardId(), e);
-
-        }
-        isLeader = leaderReplica.getName().equals(desc.getName());
-
         ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
 
         if (log.isDebugEnabled()) {
@@ -214,7 +215,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           }
         } else if (req.getParams().get(COMMIT_END_POINT, "").equals("leaders")) {
 
-          sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+          sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
         }  else if (req.getParams().get(COMMIT_END_POINT) == null) {
           // zk
           List<SolrCmdDistributor.Node> useNodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
@@ -229,7 +230,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
             if (removeNode != null) {
               useNodes.remove(removeNode);
 
-              sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+              sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
             }
           }
 
@@ -251,12 +252,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
   }
 
-  private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, Replica leaderReplica, ModifiableSolrParams params) {
+  private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, String leaderName, ModifiableSolrParams params) {
     params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
 
     params.set(COMMIT_END_POINT, "replicas");
 
-    List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+    List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderName);
 
     if (log.isDebugEnabled()) log.debug(
         "processCommit - Found the following replicas to send commit to {}",
@@ -948,9 +949,9 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     return false;
   }
 
-  protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
-    if (log.isDebugEnabled()) log.debug("leader is {}", leaderReplica.getName());
-    String leaderCoreNodeName = leaderReplica.getName();
+  protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, String leaderName) {
+    if (log.isDebugEnabled()) log.debug("leader is {}", leaderName);
+    String leaderCoreNodeName = leaderName;
     List<Replica> replicas = clusterState.getCollection(collection)
         .getSlice(shardId)
         .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 8acf099..1716b5f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -315,9 +315,8 @@ public class ConnectionManager implements Watcher, Closeable {
               }
             });
           }
-        } catch (InterruptedException | AlreadyClosedException e) {
-          ParWork.propagateInterrupt(e);
-          return;
+        } catch (AlreadyClosedException e) {
+          throw e;
         } catch (Exception e1) {
           log.error("Exception updating zk instance", e1);
           SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e1);
@@ -326,7 +325,9 @@ public class ConnectionManager implements Watcher, Closeable {
 
       } catch (AlreadyClosedException e) {
         log.info("Ran into AlreadyClosedException on reconnect");
-        return;
+        if (!getKeeper().getState().isAlive()) {
+          return;
+        }
       } catch (Exception e) {
         SolrException.log(log, "", e);
         log.info("Could not connect due to error, trying again ..");