You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/03/30 17:49:14 UTC
[solr] branch branch_9x updated: SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
This is an automated email from the ASF dual-hosted git repository.
magibney pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new ca1dfd3 SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
ca1dfd3 is described below
commit ca1dfd3640c2a5dae8d3435a47b9aece9bb21c2b
Author: Michael Gibney <mi...@michaelgibney.net>
AuthorDate: Tue Jan 25 13:59:11 2022 -0500
SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
NOTE: base for this backport commit was cherry-picked from
d195e4c2b1d5f57cc33b7f75d91a483d5abda4b2; but the commit was
amended as appropriate to include updates to the relevant
files on `main` through b39bd764609d47a61bc26ddf3aa29c07af2547fa
---
solr/CHANGES.txt | 3 +
.../processor/DistributedZkUpdateProcessor.java | 26 +++-
.../conf/solrconfig-parallel-commit.xml | 52 ++++++++
.../solr/cloud/ParallelCommitExecutionTest.java | 140 +++++++++++++++++++++
4 files changed, 216 insertions(+), 5 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8339492..b0705c4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -34,6 +34,9 @@ Bug Fixes
* SOLR-16112: DefaultSolrHighlighter.doHighlighting to Query#rewrite multiple times if necessary. (Christine Poerschke)
+* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel,
+ halving the latency of synchronous commits (Michael Gibney)
+
Other Changes
---------------------
* SOLR-15897: Remove <jmx/> from all unit test solrconfig.xml files. (Eric Pugh)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 9dff839..88e3884 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -207,6 +207,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// zk
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ // TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045
+ boolean issuedDistribCommit = false;
List<SolrCmdDistributor.Node> useNodes = null;
if (req.getParams().get(COMMIT_END_POINT) == null) {
useNodes = nodes;
@@ -217,11 +219,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
DISTRIB_FROM,
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribCommit(cmd, useNodes, params);
- cmdDistrib.blockAndDoRetries();
+ issuedDistribCommit = true;
}
}
if (isLeader) {
+ if (issuedDistribCommit) {
+ // defensive copy of params, which was passed into distribCommit(...) above; will
+ // unconditionally replace DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the
+ // new `params` val will actually be used
+ params = new ModifiableSolrParams(params);
+ }
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(COMMIT_END_POINT, "replicas");
@@ -233,14 +241,22 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
DISTRIB_FROM,
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
+ // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER
+ // distrib commits
cmdDistrib.distribCommit(cmd, useNodes, params);
+ issuedDistribCommit = true;
}
doLocalCommit(cmd);
-
- if (useNodes != null) {
- cmdDistrib.blockAndDoRetries();
- }
+ }
+ if (issuedDistribCommit) {
+ // TODO: according to discussion on SOLR-15045, this call (and all tracking of
+ // `issuedDistribCommit`) may well be superfluous, and can probably simply be removed. It is
+ // left in place for now, intentionally punting on the question of whether this internal
+ // `blockAndDoRetries()` is necessary. At worst, its presence is misleading; but it should
+ // be harmless, and allows the change fixing SOLR-15045 to be as tightly scoped as possible,
+ // leaving the behavior of the code otherwise functionally equivalent (for better or worse!)
+ cmdDistrib.blockAndDoRetries();
}
}
}
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
new file mode 100644
index 0000000..3e61994
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Test Config for a simple Classification Update Request Processor Chain
+ -->
+<config>
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="solrconfig.snippet.randomindexconfig.xml"/>
+ <requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
+ <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <updateLog enable="${enable.update.log:true}">
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+
+ </updateHandler>
+
+ <requestHandler name="/update" class="solr.UpdateRequestHandler">
+ <lst name="invariants">
+ <str name="update.chain">ensure-parallel-commit</str>
+ </lst>
+ </requestHandler>
+
+ <updateProcessor class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" name="check"/>
+
+ <updateRequestProcessorChain name="ensure-parallel-commit" post-processor="check">
+ <processor class="solr.RunUpdateProcessorFactory"/>
+ </updateRequestProcessorChain>
+</config>
diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
new file mode 100644
index 0000000..f84baff
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelCommitExecutionTest extends SolrCloudTestCase {
+
+ private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
+ private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";
+
+ /** A basic client for operations at the cloud level, default collection will be set */
+ private static CloudSolrClient CLOUD_CLIENT;
+
+ private static int expectCount;
+
+ private static volatile CountDownLatch countdown;
+ private static final AtomicInteger countup = new AtomicInteger();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // multi replicas matters; for the initial parallel commit execution tests, only consider
+ // repFactor=1
+ final int repFactor = 1; // random().nextBoolean() ? 1 : 2;
+ final int numShards = TestUtil.nextInt(random(), 1, 4);
+ final int numNodes = (numShards * repFactor);
+ expectCount = numNodes;
+
+ final String configName = DEBUG_LABEL + "_config-set";
+ final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
+
+ configureCluster(numNodes).addConfig(configName, configDir).configure();
+
+ Map<String, String> collectionProperties = new LinkedHashMap<>();
+ collectionProperties.put("config", "solrconfig-parallel-commit.xml");
+ collectionProperties.put("schema", "schema_latest.xml");
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+ .setProperties(collectionProperties)
+ .process(cluster.getSolrClient());
+
+ CLOUD_CLIENT = cluster.getSolrClient();
+ CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
+ waitForRecoveriesToFinish(CLOUD_CLIENT);
+ }
+
+ @AfterClass
+ private static void afterClass() throws Exception {
+ if (null != CLOUD_CLIENT) {
+ CLOUD_CLIENT.close();
+ CLOUD_CLIENT = null;
+ }
+ }
+
+ private static void initSyncVars() {
+ final int ct;
+ ct = expectCount;
+ countdown = new CountDownLatch(ct);
+ countup.set(0);
+ }
+
+ @Test
+ public void testParallelOk() throws Exception {
+ initSyncVars();
+ CLOUD_CLIENT.commit(true, true);
+ assertEquals(0, countdown.getCount());
+ assertEquals(expectCount, countup.get());
+ }
+
+ public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
+ assert null != client.getDefaultCollection();
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+ client.getDefaultCollection(), ZkStateReader.from(client), true, true, 330);
+ }
+
+ public static class CheckFactory extends UpdateRequestProcessorFactory {
+ @Override
+ public UpdateRequestProcessor getInstance(
+ SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ return new Check(next);
+ }
+ }
+
+ public static class Check extends UpdateRequestProcessor {
+
+ public Check(UpdateRequestProcessor next) {
+ super(next);
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ super.processCommit(cmd);
+ countdown.countDown();
+ try {
+ // NOTE: this ensures that all commits are executed in parallel; no commit can complete
+ // successfully until all commits have entered the `processCommit(...)` method.
+ if (!countdown.await(5, TimeUnit.SECONDS)) {
+ throw new RuntimeException("done waiting");
+ }
+ countup.incrementAndGet();
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+}