You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/24 02:18:20 UTC
[lucene-solr] 13/16: @1289 Allow getting the leader before being
live.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 6884bde5845af3968b685347bba88a6d6cfdf3b5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 23:35:03 2021 -0600
@1289 Allow getting the leader before being live.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 5 ++-
.../java/org/apache/solr/cloud/ZkController.java | 5 +--
.../java/org/apache/solr/handler/IndexFetcher.java | 2 +-
.../processor/DistributedZkUpdateProcessor.java | 10 ++---
.../apache/solr/common/cloud/ZkStateReader.java | 52 ++++++++++++++++++----
5 files changed, 54 insertions(+), 20 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index ff6b669..eb377d6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -350,7 +350,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
// expected
}
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
+ Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500, false);
+
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
close = true;
@@ -404,7 +405,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
Replica leader;
try {
- leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+ leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500, false);
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a5a6b19..f146d1b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1433,11 +1433,8 @@ public class ZkController implements Closeable, Runnable {
throw new AlreadyClosedException();
}
- leader = zkStateReader.getLeaderRetry(collection, shardId, 500);
+ leader = zkStateReader.getLeaderRetry(collection, shardId, 500, false);
- if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + shardId + "/leader")) {
- break;
- }
} catch (TimeoutException timeoutException) {
}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 1419340..6ac1139 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -726,7 +726,7 @@ public class IndexFetcher {
ZkController zkController = solrCore.getCoreContainer().getZkController();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- cd.getCollectionName(), cd.getShardId());
+ cd.getCollectionName(), cd.getShardId(), 1500, false);
return leaderReplica;
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 7e8faeb..a5a8847 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -190,7 +190,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
try {
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1000);
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1000, false);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -645,7 +645,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
+ "failed since we're not in cloud mode.");
}
try {
- return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()).getCoreUrl();
+ return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 1500, false).getCoreUrl();
} catch (InterruptedException | TimeoutException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
@@ -717,14 +717,14 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
try {
// Not equivalent to getLeaderProps, which retries to find a leader.
// Replica leader = slice.getLeader();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 100);
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 100, false);
isLeader = leaderReplica.getName().equals(desc.getName());
if (log.isTraceEnabled()) log.trace("Are we leader for sending to replicas? {} phase={}", isLeader, phase);
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
shardId = cloudDesc.getShardId();
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1500, false);
}
}
@@ -891,7 +891,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
Slice mySlice = coll.getSlice(myShardId);
final Slice.State state = mySlice.getState();
if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
- Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
+ Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 1500, false);
boolean amILeader = myLeader.getName().equals(desc.getName());
if (amILeader) {
// Does the document belong to my hash range as well?
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index a73bb23..48c3329 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -980,6 +980,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
+ return getLeaderRetry(collection, shard, timeout, true);
+ }
+
+ /**
+ * Get shard leader properties, with retry if none exist.
+ */
+ public Replica getLeaderRetry(String collection, String shard, int timeout, boolean mustBeLive) throws InterruptedException, TimeoutException {
DocCollection coll = getClusterState().getCollectionOrNull(collection);
@@ -987,12 +994,20 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Slice slice = coll.getSlice(shard);
if (slice != null) {
Replica leader = slice.getLeader();
- if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
+ boolean valid;
+ try {
+ valid = mustBeLive ? isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
return leader;
}
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
+ if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && valid) {
return replica;
}
}
@@ -1007,15 +1022,36 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Slice slice = c.getSlice(shard);
if (slice == null) return false;
Replica leader = slice.getLeader();
- if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
- returnLeader.set(leader);
- return true;
+
+ if (leader != null && leader.getState() == Replica.State.ACTIVE) {
+ boolean valid = false;
+ try {
+ valid = mustBeLive ? isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ if (valid) {
+ returnLeader.set(leader);
+ return true;
+ }
}
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
- returnLeader.set(replica);
- return true;
+ if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
+ boolean valid = false;
+ try {
+ valid = mustBeLive ? zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ if (valid) {
+ returnLeader.set(replica);
+ return true;
+ }
}
}