You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 06:49:19 UTC
[lucene-solr] 01/03: @497 Parallel local / dist commit.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 78af98df127ed08c5d1856f3e10fc1a660094c37
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 01:22:04 2020 -0500
@497 Parallel local / dist commit.
---
.../processor/DistributedZkUpdateProcessor.java | 48 ++++++++++++----------
1 file changed, 26 insertions(+), 22 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index a9cae85..bc0dc98 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -190,12 +190,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (nodes != null) {
nodes.removeIf((node) -> node.getNodeProps().getCoreNodeName().equals(
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor().getCoreNodeName()));
-//
-//// if (nodes.size() == 0) {
-//// log.info("Found no other shards or replicas, local commit liveNodes={} clusterstate={}", clusterState.getLiveNodes(), clusterState.getCollection(collection));
-//// doLocalCommit(cmd);
-//// return;
-//// }
}
try {
@@ -242,30 +236,40 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
if (isLeader) {
- log.info("Do a local commit on NRT endpoint for leader");
- try {
- doLocalCommit(cmd);
- } catch (Exception e) {
- log.error("Error on local commit");
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
+ try (ParWork worker = new ParWork(this)) {
+ worker.collect(() -> {
+ log.info("Do a local commit on NRT endpoint for leader");
+ try {
+ doLocalCommit(cmd);
+ } catch (Exception e) {
+ log.error("Error on local commit");
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ });
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(COMMIT_END_POINT, "replicas");
+ params.set(COMMIT_END_POINT, "replicas");
- useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+ useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(),
+ leaderReplica);
- log.info("Found the following replicas to send commit to {}", useNodes);
+ log.info("Found the following replicas to send commit to {}",
+ useNodes);
- if (useNodes != null && useNodes.size() > 0) {
- log.info("send commit to replicas nodes={}", useNodes);
+ if (useNodes != null && useNodes.size() > 0) {
+ log.info("send commit to replicas nodes={}", useNodes);
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM, ZkCoreNodeProps
+ .getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
- cmdDistrib.distribCommit(cmd, useNodes, params);
+ List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
+ worker.collect(() -> {
+ cmdDistrib.distribCommit(cmd, finalUseNodes, params);
+ });
+ }
+ worker.addCollect("distCommit");
}
}