You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/24 02:18:20 UTC

[lucene-solr] 13/16: @1289 Allow getting the leader before being live.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6884bde5845af3968b685347bba88a6d6cfdf3b5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 23:35:03 2021 -0600

    @1289 Allow getting the leader before being live.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  5 ++-
 .../java/org/apache/solr/cloud/ZkController.java   |  5 +--
 .../java/org/apache/solr/handler/IndexFetcher.java |  2 +-
 .../processor/DistributedZkUpdateProcessor.java    | 10 ++---
 .../apache/solr/common/cloud/ZkStateReader.java    | 52 ++++++++++++++++++----
 5 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index ff6b669..eb377d6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -350,7 +350,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
           // expected
         }
 
-        Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
+        Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500, false);
+
         if (leader != null && leader.getName().equals(coreName)) {
           log.info("We are the leader, STOP recovery");
           close = true;
@@ -404,7 +405,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
         Replica leader;
         try {
-          leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+          leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500, false);
 
           if (leader != null && leader.getName().equals(coreName)) {
             log.info("We are the leader, STOP recovery");
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a5a6b19..f146d1b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1433,11 +1433,8 @@ public class ZkController implements Closeable, Runnable {
             throw new AlreadyClosedException();
           }
 
-          leader = zkStateReader.getLeaderRetry(collection, shardId, 500);
+          leader = zkStateReader.getLeaderRetry(collection, shardId, 500, false);
 
-          if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + shardId  + "/leader")) {
-            break;
-          }
         } catch (TimeoutException timeoutException) {
 
         }
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 1419340..6ac1139 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -726,7 +726,7 @@ public class IndexFetcher {
     ZkController zkController = solrCore.getCoreContainer().getZkController();
     CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
     Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
-        cd.getCollectionName(), cd.getShardId());
+        cd.getCollectionName(), cd.getShardId(), 1500, false);
     return leaderReplica;
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 7e8faeb..a5a8847 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -190,7 +190,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
             EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
 
         try {
-          leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1000);
+          leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1000, false);
         } catch (Exception e) {
           ParWork.propagateInterrupt(e);
           throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -645,7 +645,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           + "failed since we're not in cloud mode.");
     }
     try {
-      return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()).getCoreUrl();
+      return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1500, false).getCoreUrl();
     } catch (InterruptedException | TimeoutException e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
@@ -717,14 +717,14 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     try {
       // Not equivalent to getLeaderProps, which  retries to find a leader.
       // Replica leader = slice.getLeader();
-      Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 100);
+      Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 100, false);
       isLeader = leaderReplica.getName().equals(desc.getName());
       if (log.isTraceEnabled()) log.trace("Are we leader for sending to replicas? {} phase={}", isLeader, phase);
       if (!isLeader) {
         isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
         if (isSubShardLeader) {
           shardId = cloudDesc.getShardId();
-          leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+          leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1500, false);
         }
       }
 
@@ -891,7 +891,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     Slice mySlice = coll.getSlice(myShardId);
     final Slice.State state = mySlice.getState();
     if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
-      Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
+      Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 1500, false);
       boolean amILeader = myLeader.getName().equals(desc.getName());
       if (amILeader) {
         // Does the document belong to my hash range as well?
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index a73bb23..48c3329 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -980,6 +980,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
+    return getLeaderRetry(collection, shard, timeout, true);
+  }
+
+  /**
+   * Get shard leader properties, with retry if none exist.
+   */
+  public Replica getLeaderRetry(String collection, String shard, int timeout, boolean mustBeLive) throws InterruptedException, TimeoutException {
 
     DocCollection coll = getClusterState().getCollectionOrNull(collection);
 
@@ -987,12 +994,20 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       Slice slice = coll.getSlice(shard);
       if (slice != null) {
         Replica leader = slice.getLeader();
-        if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
+        boolean valid;
+        try {
+          valid = mustBeLive ? isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
+        } catch (KeeperException e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        } catch (InterruptedException e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        }
+        if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
           return leader;
         }
         Collection<Replica> replicas = slice.getReplicas();
         for (Replica replica : replicas) {
-          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
+          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && valid) {
             return replica;
           }
         }
@@ -1007,15 +1022,36 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         Slice slice = c.getSlice(shard);
         if (slice == null) return false;
         Replica leader = slice.getLeader();
-        if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
-          returnLeader.set(leader);
-          return true;
+
+        if (leader != null && leader.getState() == Replica.State.ACTIVE) {
+          boolean valid = false;
+          try {
+            valid = mustBeLive ?  isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName()  + "/leader") : isNodeLive(leader.getNodeName());
+          } catch (KeeperException e) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+          } catch (InterruptedException e) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+          }
+          if (valid) {
+            returnLeader.set(leader);
+            return true;
+          }
         }
         Collection<Replica> replicas = slice.getReplicas();
         for (Replica replica : replicas) {
-          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
-            returnLeader.set(replica);
-            return true;
+          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
+            boolean valid = false;
+            try {
+              valid = mustBeLive ?  zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName()  + "/leader") : isNodeLive(leader.getNodeName());
+            } catch (KeeperException e) {
+              throw new SolrException(ErrorCode.SERVER_ERROR, e);
+            } catch (InterruptedException e) {
+              throw new SolrException(ErrorCode.SERVER_ERROR, e);
+            }
+            if (valid) {
+              returnLeader.set(replica);
+              return true;
+            }
           }
         }