You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/27 03:08:32 UTC
[lucene-solr] branch reference_impl_dev updated: @1413 Dist commit
leader bs.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 9fd027c @1413 Dist commit leader bs.
9fd027c is described below
commit 9fd027ca6bfe3e3b3f78924e9af2b23531d0a7e0
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Feb 26 21:07:26 2021 -0600
@1413 Dist commit leader bs.
Took 27 minutes
---
.../apache/solr/update/DeleteUpdateCommand.java | 5 +++
.../processor/DistributedZkUpdateProcessor.java | 37 +++++++++++-----------
.../solr/common/cloud/ConnectionManager.java | 9 +++---
3 files changed, 29 insertions(+), 22 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
index ac56ab3..3827ab5 100644
--- a/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/DeleteUpdateCommand.java
@@ -51,6 +51,11 @@ public class DeleteUpdateCommand extends UpdateCommand {
query = null;
indexedId = null;
version = 0;
+ commitWithin = -1;
+
+ flags = 0;
+ route = null;
+ indexedId = null;
}
/** Returns the indexed ID for this delete. The returned BytesRef is retained across multiple calls, and should not be modified. */
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 4f59f8e..e471b22 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -160,6 +160,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ Replica leaderReplica;
+ try {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId());
+ } catch (Exception e) {
+ ParWork.propagateInterrupt(e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+
+ }
+
+ isLeader = leaderReplica.getName().equals(desc.getName());
+
if (log.isDebugEnabled()) log.debug("processCommit - start commit isLeader={} commit_end_point={} replicaType={}", isLeader, req.getParams().get(COMMIT_END_POINT), replicaType);
try (ParWork worker = new ParWork(this, false, false)) {
@@ -175,7 +186,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
updateCommand = cmd;
List<SolrCmdDistributor.Node> nodes = null;
- Replica leaderReplica = null;
+
zkCheck();
if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal") || (req.getParams().getBool("dist") != null && !req.getParams().getBool("dist"))) {
@@ -185,16 +196,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
return;
}
- try {
- leaderReplica = clusterState.getCollection(collection).getSlice(cloudDesc.getShardId()).getLeader();
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Exception finding leader for shard " + cloudDesc.getShardId(), e);
-
- }
- isLeader = leaderReplica.getName().equals(desc.getName());
-
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (log.isDebugEnabled()) {
@@ -214,7 +215,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
} else if (req.getParams().get(COMMIT_END_POINT, "").equals("leaders")) {
- sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+ sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
} else if (req.getParams().get(COMMIT_END_POINT) == null) {
// zk
List<SolrCmdDistributor.Node> useNodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
@@ -229,7 +230,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (removeNode != null) {
useNodes.remove(removeNode);
- sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+ sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
}
}
@@ -251,12 +252,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
}
- private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, Replica leaderReplica, ModifiableSolrParams params) {
+ private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, String leaderName, ModifiableSolrParams params) {
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(COMMIT_END_POINT, "replicas");
- List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+ List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderName);
if (log.isDebugEnabled()) log.debug(
"processCommit - Found the following replicas to send commit to {}",
@@ -948,9 +949,9 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
return false;
}
- protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
- if (log.isDebugEnabled()) log.debug("leader is {}", leaderReplica.getName());
- String leaderCoreNodeName = leaderReplica.getName();
+ protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, String leaderName) {
+ if (log.isDebugEnabled()) log.debug("leader is {}", leaderName);
+ String leaderCoreNodeName = leaderName;
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 8acf099..1716b5f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -315,9 +315,8 @@ public class ConnectionManager implements Watcher, Closeable {
}
});
}
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e);
- return;
+ } catch (AlreadyClosedException e) {
+ throw e;
} catch (Exception e1) {
log.error("Exception updating zk instance", e1);
SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e1);
@@ -326,7 +325,9 @@ public class ConnectionManager implements Watcher, Closeable {
} catch (AlreadyClosedException e) {
log.info("Ran into AlreadyClosedException on reconnect");
- return;
+ if (!getKeeper().getState().isAlive()) {
+ return;
+ }
} catch (Exception e) {
SolrException.log(log, "", e);
log.info("Could not connect due to error, trying again ..");