You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/25 03:09:26 UTC
[lucene-solr] branch reference_impl_dev updated: @1387 Short
circuit to local leader for dist commit.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 21a6858 @1387 Short circuit to local leader for dist commit.
21a6858 is described below
commit 21a685844476d76ce6d12d8a4eac46cd945e6cd7
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Feb 24 20:58:56 2021 -0600
@1387 Short circuit to local leader for dist commit.
Took 23 minutes
---
.../processor/DistributedUpdateProcessor.java | 4 +-
.../processor/DistributedZkUpdateProcessor.java | 119 ++++++++++++---------
.../org/apache/solr/core/TestJmxIntegration.java | 6 +-
3 files changed, 76 insertions(+), 53 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 928523f..ec0cb7a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -490,7 +490,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
- if (log.isDebugEnabled()) log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ if (log.isDebugEnabled()) {
+ log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ }
return null;
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index b750ddc..3578058 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -201,6 +201,21 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
isLeader = leaderReplica.getName().equals(desc.getName());
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+
+ if (isLeader) {
+ SolrCmdDistributor.Node removeNode = null;
+ for (SolrCmdDistributor.Node node : nodes) {
+ if (node.getCoreName().equals(this.desc.getName())) {
+ removeNode = node;
+ }
+ }
+ if (removeNode != null) {
+ nodes.remove(removeNode);
+
+ sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
+ }
+ }
+
if (nodes == null) {
// This could happen if there are only pull replicas
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -222,42 +237,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
} else if (req.getParams().get(COMMIT_END_POINT, "").equals("leaders")) {
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
-
- params.set(COMMIT_END_POINT, "replicas");
-
- List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
-
- if (log.isDebugEnabled()) log.debug(
- "processCommit - Found the following replicas to send commit to {}",
- useNodes);
-
- if (useNodes != null && useNodes.size() > 0) {
- if (log.isDebugEnabled()) log.debug("processCommit - send commit to replicas nodes={}",
- useNodes);
-
- params.set(DISTRIB_FROM, Replica
- .getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
-
- List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
-
- worker.collect("distCommit", () -> {
- cmdDistrib.distribCommit(cmd, finalUseNodes, params);
- });
- }
-
- if (log.isDebugEnabled()) {
- log.debug(
- "processCommit - Do a local commit for leader");
- }
-
- worker.collect("localCommit", () -> {
- try {
- doLocalCommit(cmd);
- } catch (IOException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- });
+ sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica, params);
} else {
// zk
@@ -288,6 +268,45 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
}
+ private void sendCommitToReplicasAndLocalCommit(CommitUpdateCommand cmd, ParWork worker, Replica leaderReplica, ModifiableSolrParams params) {
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+
+ params.set(COMMIT_END_POINT, "replicas");
+
+ List<SolrCmdDistributor.Node> useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+
+ if (log.isDebugEnabled()) log.debug(
+ "processCommit - Found the following replicas to send commit to {}",
+ useNodes);
+
+ if (useNodes != null && useNodes.size() > 0) {
+ if (log.isDebugEnabled()) log.debug("processCommit - send commit to replicas nodes={}",
+ useNodes);
+
+ params.set(DISTRIB_FROM, Replica
+ .getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
+
+ List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
+
+ worker.collect("distCommit", () -> {
+ cmdDistrib.distribCommit(cmd, finalUseNodes, params);
+ });
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "processCommit - Do a local commit for leader");
+ }
+
+ worker.collect("localCommit", () -> {
+ try {
+ doLocalCommit(cmd);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ });
+ }
+
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
try {
@@ -688,23 +707,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
clusterState = zkController.getClusterState();
- DocCollection coll = clusterState.getCollection(collection);
- Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
-
- if (slice == null) {
- // No slice found. Most strict routers will have already thrown an exception, so a null return is
- // a signal to use the slice of this core.
- // TODO: what if this core is not in the targeted collection?
- String shardId = cloudDesc.getShardId();
- slice = coll.getSlice(shardId);
- if (slice == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
- }
- }
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ DocCollection coll = clusterState.getCollection(collection);
+ Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
+
if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
assert TestInjection.injectFailReplicaRequests();
@@ -715,6 +724,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
+ if (slice == null) {
+ // No slice found. Most strict routers will have already thrown an exception, so a null return is
+ // a signal to use the slice of this core.
+ // TODO: what if this core is not in the targeted collection?
+ String shardId = cloudDesc.getShardId();
+ slice = coll.getSlice(shardId);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
+ }
+ }
+
String shardId = slice.getName();
Replica leaderReplica;
try {
@@ -726,7 +746,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
- shardId = cloudDesc.getShardId();
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
}
}
diff --git a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
index 009838b..cee5f62 100644
--- a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
+++ b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
@@ -183,7 +183,7 @@ public class TestJmxIntegration extends SolrTestCaseJ4 {
ObjectName name = nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
- timeout = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ timeout = new TimeOut(1000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
Integer oldNumDocs = null;
while (!timeout.hasTimedOut()) {
nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
@@ -195,7 +195,9 @@ public class TestJmxIntegration extends SolrTestCaseJ4 {
}
}
- assertNotNull("oldNumDocs should not be null", oldNumDocs);
+ if (oldNumDocs == null) {
+ oldNumDocs = 0;
+ }
assertU(adoc("id", "1"));
assertU("commit", commit());