You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/03/30 17:49:14 UTC

[solr] branch branch_9x updated: SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)

This is an automated email from the ASF dual-hosted git repository.

magibney pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new ca1dfd3  SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
ca1dfd3 is described below

commit ca1dfd3640c2a5dae8d3435a47b9aece9bb21c2b
Author: Michael Gibney <mi...@michaelgibney.net>
AuthorDate: Tue Jan 25 13:59:11 2022 -0500

    SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
    
    NOTE: base for this backport commit was cherry-picked from
    d195e4c2b1d5f57cc33b7f75d91a483d5abda4b2; but the commit was
    amended as appropriate to include updates to the relevant
    files on `main` through b39bd764609d47a61bc26ddf3aa29c07af2547fa
---
 solr/CHANGES.txt                                   |   3 +
 .../processor/DistributedZkUpdateProcessor.java    |  26 +++-
 .../conf/solrconfig-parallel-commit.xml            |  52 ++++++++
 .../solr/cloud/ParallelCommitExecutionTest.java    | 140 +++++++++++++++++++++
 4 files changed, 216 insertions(+), 5 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8339492..b0705c4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -34,6 +34,9 @@ Bug Fixes
 
 * SOLR-16112: DefaultSolrHighlighter.doHighlighting to Query#rewrite multiple times if necessary. (Christine Poerschke)
 
+* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel,
+  halving the latency of synchronous commits (Michael Gibney)
+
 Other Changes
 ---------------------
 * SOLR-15897: Remove <jmx/> from all unit test solrconfig.xml files. (Eric Pugh)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 9dff839..88e3884 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -207,6 +207,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       // zk
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
 
+      // TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045
+      boolean issuedDistribCommit = false;
       List<SolrCmdDistributor.Node> useNodes = null;
       if (req.getParams().get(COMMIT_END_POINT) == null) {
         useNodes = nodes;
@@ -217,11 +219,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
               DISTRIB_FROM,
               ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
           cmdDistrib.distribCommit(cmd, useNodes, params);
-          cmdDistrib.blockAndDoRetries();
+          issuedDistribCommit = true;
         }
       }
 
       if (isLeader) {
+        if (issuedDistribCommit) {
+          // defensive copy of params, which was passed into distribCommit(...) above; will
+          // unconditionally replace DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the
+          // new `params` val will actually be used
+          params = new ModifiableSolrParams(params);
+        }
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
 
         params.set(COMMIT_END_POINT, "replicas");
@@ -233,14 +241,22 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
               DISTRIB_FROM,
               ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
 
+          // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER
+          // distrib commits
           cmdDistrib.distribCommit(cmd, useNodes, params);
+          issuedDistribCommit = true;
         }
 
         doLocalCommit(cmd);
-
-        if (useNodes != null) {
-          cmdDistrib.blockAndDoRetries();
-        }
+      }
+      if (issuedDistribCommit) {
+        // TODO: according to discussion on SOLR-15045, this call (and all tracking of
+        // `issuedDistribCommit`) may well be superfluous, and can probably simply be removed. It is
+        // left in place for now, intentionally punting on the question of whether this internal
+        // `blockAndDoRetries()` is necessary. At worst, its presence is misleading; but it should
+        // be harmless, and allows the change fixing SOLR-15045 to be as tightly scoped as possible,
+        // leaving the behavior of the code otherwise functionally equivalent (for better or worse!)
+        cmdDistrib.blockAndDoRetries();
       }
     }
   }
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
new file mode 100644
index 0000000..3e61994
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+   Test Config for a simple Classification Update Request Processor Chain
+  -->
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="solrconfig.snippet.randomindexconfig.xml"/>
+  <requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+  
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog enable="${enable.update.log:true}">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+
+  </updateHandler>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler">
+    <lst name="invariants">
+      <str name="update.chain">ensure-parallel-commit</str>
+    </lst>
+  </requestHandler>
+
+  <updateProcessor class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" name="check"/>
+
+  <updateRequestProcessorChain name="ensure-parallel-commit" post-processor="check">
+    <processor class="solr.RunUpdateProcessorFactory"/>
+  </updateRequestProcessorChain>
+</config>
diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
new file mode 100644
index 0000000..f84baff
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelCommitExecutionTest extends SolrCloudTestCase {
+
+  private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
+  private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";
+
+  /** A basic client for operations at the cloud level, default collection will be set */
+  private static CloudSolrClient CLOUD_CLIENT;
+
+  private static int expectCount;
+
+  private static volatile CountDownLatch countdown;
+  private static final AtomicInteger countup = new AtomicInteger();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // multi replicas matters; for the initial parallel commit execution tests, only consider
+    // repFactor=1
+    final int repFactor = 1; // random().nextBoolean() ? 1 : 2;
+    final int numShards = TestUtil.nextInt(random(), 1, 4);
+    final int numNodes = (numShards * repFactor);
+    expectCount = numNodes;
+
+    final String configName = DEBUG_LABEL + "_config-set";
+    final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
+
+    configureCluster(numNodes).addConfig(configName, configDir).configure();
+
+    Map<String, String> collectionProperties = new LinkedHashMap<>();
+    collectionProperties.put("config", "solrconfig-parallel-commit.xml");
+    collectionProperties.put("schema", "schema_latest.xml");
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .setProperties(collectionProperties)
+        .process(cluster.getSolrClient());
+
+    CLOUD_CLIENT = cluster.getSolrClient();
+    CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
+    waitForRecoveriesToFinish(CLOUD_CLIENT);
+  }
+
+  @AfterClass
+  private static void afterClass() throws Exception {
+    if (null != CLOUD_CLIENT) {
+      CLOUD_CLIENT.close();
+      CLOUD_CLIENT = null;
+    }
+  }
+
+  private static void initSyncVars() {
+    final int ct;
+    ct = expectCount;
+    countdown = new CountDownLatch(ct);
+    countup.set(0);
+  }
+
+  @Test
+  public void testParallelOk() throws Exception {
+    initSyncVars();
+    CLOUD_CLIENT.commit(true, true);
+    assertEquals(0, countdown.getCount());
+    assertEquals(expectCount, countup.get());
+  }
+
+  public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
+    assert null != client.getDefaultCollection();
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+        client.getDefaultCollection(), ZkStateReader.from(client), true, true, 330);
+  }
+
+  public static class CheckFactory extends UpdateRequestProcessorFactory {
+    @Override
+    public UpdateRequestProcessor getInstance(
+        SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      return new Check(next);
+    }
+  }
+
+  public static class Check extends UpdateRequestProcessor {
+
+    public Check(UpdateRequestProcessor next) {
+      super(next);
+    }
+
+    @Override
+    public void processCommit(CommitUpdateCommand cmd) throws IOException {
+      super.processCommit(cmd);
+      countdown.countDown();
+      try {
+        // NOTE: this ensures that all commits are executed in parallel; no commit can complete
+        // successfully until all commits have entered the `processCommit(...)` method.
+        if (!countdown.await(5, TimeUnit.SECONDS)) {
+          throw new RuntimeException("done waiting");
+        }
+        countup.incrementAndGet();
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+}