You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/01/20 20:37:24 UTC

[solr] branch branch_9x updated: SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 6d3aadb96bc SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.
6d3aadb96bc is described below

commit 6d3aadb96bcdc43a94ec727d1e95134a45c41e85
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Fri Jan 20 12:30:33 2023 -0700

    SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.
    
    This behavior can also be configured per AbstractUpdateRequest instance.
    
    (cherry picked from commit afb980bc217cb492b24e002c99695e28794aceb9)
---
 solr/CHANGES.txt                                   |   3 +
 .../client/solrj/impl/CloudHttp2SolrClient.java    |  30 +
 .../client/solrj/impl/CloudLegacySolrClient.java   |  40 +-
 .../solr/client/solrj/impl/CloudSolrClient.java    |  35 +-
 .../solrj/request/AbstractUpdateRequest.java       |  11 +
 .../solr/client/solrj/request/IsUpdateRequest.java |  15 +-
 .../configsets/tracking-updates/conf/schema.xml    |  29 +
 .../tracking-updates/conf/solrconfig.xml           |  63 +++
 .../solrj/impl/CloudHttp2SolrClientTest.java       |   1 +
 .../client/solrj/impl/CloudSolrClientTest.java     |   1 +
 .../impl/SendUpdatesToLeadersOverrideTest.java     | 624 +++++++++++++++++++++
 .../src/java/org/apache/solr/SolrTestCaseJ4.java   |  10 +-
 .../processor/RecordingUpdateProcessorFactory.java |   0
 .../processor/TrackingUpdateProcessorFactory.java  |   0
 14 files changed, 843 insertions(+), 19 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ed6def4feaf..1c1e56f64b5 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -97,6 +97,9 @@ Improvements
 * SOLR-15787: FileSystemConfigSetService: implement the abstraction completely.  It could be useful
   for putting ConfigSets on a shared file system.  (Nazerke Seidan, David Smiley)
 
+* SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.  This behavior can also be
+  configured per AbstractUpdateRequest instance.  (hossman)
+
 Optimizations
 ---------------------
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index fa602215d26..89a08280535 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -206,12 +206,39 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
       if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
     }
 
+    /**
+     * Tells {@link Builder} that created clients should be configured such that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
+     *
+     * @see #sendUpdatesToAnyReplica
+     * @see CloudSolrClient#isUpdatesToLeaders
+     */
+    public Builder sendUpdatesOnlyToShardLeaders() {
+      shardLeadersOnly = true;
+      return this;
+    }
+
+    /**
+     * Tells {@link Builder} that created clients should be configured such that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
+     *
+     * @see #sendUpdatesOnlyToShardLeaders
+     * @see CloudSolrClient#isUpdatesToLeaders
+     */
+    public Builder sendUpdatesToAnyReplica() {
+      shardLeadersOnly = false;
+      return this;
+    }
+
     /**
      * Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to
      * shard leaders only.
      *
      * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a
      * {@link SolrException}
+     *
+     * @see #sendDirectUpdatesToAnyShardReplica
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
      */
     public Builder sendDirectUpdatesToShardLeadersOnly() {
       directUpdatesToLeadersOnly = true;
@@ -224,6 +251,9 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
      *
      * <p>Shard leaders are still preferred, but the created clients will fallback to using other
      * replicas if a leader cannot be found.
+     *
+     * @see #sendDirectUpdatesToShardLeadersOnly
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
      */
     public Builder sendDirectUpdatesToAnyShardReplica() {
       directUpdatesToLeadersOnly = false;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
index 983d959434c..eac2326a330 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
@@ -283,9 +283,11 @@ public class CloudLegacySolrClient extends CloudSolrClient {
     }
 
     /**
-     * Tells {@link Builder} that created clients should send updates only to shard leaders.
+     * Tells {@link Builder} that created clients should be configured such that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
      *
-     * <p>WARNING: This method currently has no effect. See SOLR-6312 for more information.
+     * @see #sendUpdatesToAnyReplica
+     * @see CloudSolrClient#isUpdatesToLeaders
      */
     public Builder sendUpdatesOnlyToShardLeaders() {
       shardLeadersOnly = true;
@@ -293,20 +295,45 @@ public class CloudLegacySolrClient extends CloudSolrClient {
     }
 
     /**
-     * Tells {@link Builder} that created clients should send updates to all replicas for a shard.
+     * Tells {@link Builder} that created clients should be configured such that {@link
+     * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
      *
-     * <p>WARNING: This method currently has no effect. See SOLR-6312 for more information.
+     * @see #sendUpdatesOnlyToShardLeaders
+     * @see CloudSolrClient#isUpdatesToLeaders
      */
-    public Builder sendUpdatesToAllReplicasInShard() {
+    public Builder sendUpdatesToAnyReplica() {
       shardLeadersOnly = false;
       return this;
     }
 
+    /**
+     * This method has no effect.
+     *
+     * <p>In older versions of Solr, this method was an incorrectly named equivilent to {@link
+     * #sendUpdatesToAnyReplica}, which had no effect because that setting was ignored in the
+     * created clients. When the underlying {@link CloudSolrClient} behavior was fixed, this method
+     * was modified to be an explicit No-Op, since the implied behavior of sending updates to
+     * <em>all</em> replicas has never been supported, and was never intended to be supported.
+     *
+     * @see #sendUpdatesOnlyToShardLeaders
+     * @see #sendUpdatesToAnyReplica
+     * @see CloudSolrClient#isUpdatesToLeaders
+     * @see <a href="https://issues.apache.org/jira/browse/SOLR-6312">SOLR-6312</a>
+     * @deprecated Never supported
+     */
+    @Deprecated
+    public Builder sendUpdatesToAllReplicasInShard() {
+      return this;
+    }
+
     /**
      * Tells {@link Builder} that created clients should send direct updates to shard leaders only.
      *
      * <p>UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a
      * {@link SolrException}
+     *
+     * @see #sendDirectUpdatesToAnyShardReplica
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
      */
     public Builder sendDirectUpdatesToShardLeadersOnly() {
       directUpdatesToLeadersOnly = true;
@@ -319,6 +346,9 @@ public class CloudLegacySolrClient extends CloudSolrClient {
      *
      * <p>Shard leaders are still preferred, but the created clients will fallback to using other
      * replicas if a leader cannot be found.
+     *
+     * @see #sendDirectUpdatesToShardLeadersOnly
+     * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
      */
     public Builder sendDirectUpdatesToAnyShardReplica() {
       directUpdatesToLeadersOnly = false;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 9c1c1db83ed..5a146e70a81 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -847,11 +847,10 @@ public abstract class CloudSolrClient extends SolrClient {
       isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
     }
     boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
-    boolean isUpdate = (request instanceof IsUpdateRequest) && (request instanceof UpdateRequest);
     if (!inputCollections.isEmpty()
         && !isAdmin
         && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
-      Set<String> requestedCollectionNames = resolveAliases(inputCollections, isUpdate);
+      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
 
       StringBuilder stateVerParamBuilder = null;
       for (String requestedCollection : requestedCollectionNames) {
@@ -1048,11 +1047,12 @@ public abstract class CloudSolrClient extends SolrClient {
     connect();
 
     boolean sendToLeaders = false;
-    boolean isUpdate = false;
 
     if (request instanceof IsUpdateRequest) {
-      if (request instanceof UpdateRequest) {
-        isUpdate = true;
+      sendToLeaders = ((IsUpdateRequest) request).isSendToLeaders() && this.isUpdatesToLeaders();
+
+      // Check if we can do a "directUpdate" ...
+      if (sendToLeaders && request instanceof UpdateRequest) {
         if (inputCollections.size() > 1) {
           throw new SolrException(
               SolrException.ErrorCode.BAD_REQUEST,
@@ -1069,7 +1069,6 @@ public abstract class CloudSolrClient extends SolrClient {
           return response;
         }
       }
-      sendToLeaders = true;
     }
 
     SolrParams reqParams = request.getParams();
@@ -1099,7 +1098,7 @@ public abstract class CloudSolrClient extends SolrClient {
       }
 
     } else { // Typical...
-      Set<String> collectionNames = resolveAliases(inputCollections, isUpdate);
+      Set<String> collectionNames = resolveAliases(inputCollections);
       if (collectionNames.isEmpty()) {
         throw new SolrException(
             SolrException.ErrorCode.BAD_REQUEST,
@@ -1194,7 +1193,7 @@ public abstract class CloudSolrClient extends SolrClient {
    * Resolves the input collections to their possible aliased collections. Doesn't validate
    * collection existence.
    */
-  private Set<String> resolveAliases(List<String> inputCollections, boolean isUpdate) {
+  private Set<String> resolveAliases(List<String> inputCollections) {
     if (inputCollections.isEmpty()) {
       return Collections.emptySet();
     }
@@ -1210,11 +1209,31 @@ public abstract class CloudSolrClient extends SolrClient {
     return uniqueNames;
   }
 
+  /**
+   * If true, this client has been configured such that it will generally prefer to send {@link
+   * IsUpdateRequest} requests to a shard leader, if and only if {@link
+   * IsUpdateRequest#isSendToLeaders} is also true. If false, then this client has been configured
+   * to obey normal routing preferences when dealing with {@link IsUpdateRequest} requests.
+   *
+   * @see #isDirectUpdatesToLeadersOnly
+   */
   public boolean isUpdatesToLeaders() {
     return updatesToLeaders;
   }
 
   /**
+   * If true, this client has been configured such that "direct updates" will <em>only</em> be sent
+   * to the current leader of the corrisponding shard, and will not be retried with other replicas.
+   * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+   * IsUpdateRequest#isSendToLeaders} returns false.
+   *
+   * <p>A "direct update" is any update that can be sent directly to a single shard, and does not
+   * need to be broadcast to every shard. (Example: document updates or "delete by id" when using
+   * the default router; non-direct updates are things like commits and "delete by query").
+   *
+   * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct updates" for different
+   * shards, this client may break the request up and merge th resposes.
+   *
    * @return true if direct updates are sent to shard leaders only
    */
   public boolean isDirectUpdatesToLeadersOnly() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
index 352fd290f59..733502353dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
@@ -159,4 +159,15 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse>
     this.commitWithin = commitWithin;
     return this;
   }
+
+  private boolean sendToLeaders = true;
+
+  public boolean isSendToLeaders() {
+    return sendToLeaders;
+  }
+
+  public AbstractUpdateRequest setSendToLeaders(final boolean sendToLeaders) {
+    this.sendToLeaders = sendToLeaders;
+    return this;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
index 5e8c8aff854..54f449e1358 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
@@ -16,5 +16,18 @@
  */
 package org.apache.solr.client.solrj.request;
 
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
 /** Marker class so that we can determine which requests are updates. */
-public interface IsUpdateRequest {}
+public interface IsUpdateRequest {
+
+  /**
+   * Indicates if clients should make attempts to route this request to a shard leader, overriding
+   * typical client routing preferences for requests. Defaults to true.
+   *
+   * @see CloudSolrClient#isUpdatesToLeaders
+   */
+  default boolean isSendToLeaders() {
+    return true;
+  }
+}
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml
new file mode 100644
index 00000000000..4124feab0c3
--- /dev/null
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="${solr.tests.IntegerFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="true" stored="true"/>
+  <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+  <field name="id" type="string" indexed="true" stored="true"/>
+  <dynamicField name="*_s"  type="string"  indexed="true"  stored="true" />
+  <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml
new file mode 100644
index 00000000000..d6df57ba134
--- /dev/null
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with tracking enabled on the URP, both before and after distrib -->
+
+<config>
+
+  <updateRequestProcessorChain default="true">
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="solr.TrackingUpdateProcessorFactory">
+      <str name="group">pre-distrib</str>
+    </processor>
+    <processor class="solr.DistributedUpdateProcessorFactory" />
+    <processor class="solr.TrackingUpdateProcessorFactory">
+      <str name="group">post-distrib</str>
+    </processor>
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+  
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+  <indexConfig>
+    <mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
+  </indexConfig>
+</config>
+
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index f994a216c6b..92409833464 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -304,6 +304,7 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
     try (CloudSolrClient threadedClient =
         new RandomizingCloudSolrClientBuilder(
                 Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesOnlyToShardLeaders()
             .withParallelUpdates(true)
             .build()) {
       threadedClient.setDefaultCollection("routing_collection");
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 61d2a60aaec..125c8a3580d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -305,6 +305,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     try (CloudSolrClient threadedClient =
         new RandomizingCloudSolrClientBuilder(
                 Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesOnlyToShardLeaders()
             .withParallelUpdates(true)
             .build()) {
       threadedClient.setDefaultCollection("routing_collection");
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
new file mode 100644
index 00000000000..12009c6592f
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.not;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.update.processor.TrackingUpdateProcessorFactory;
+import org.hamcrest.MatcherAssert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the behavior of {@link CloudSolrClient#isUpdatesToLeaders} and {@link
+ * IsUpdateRequest#isSendToLeaders}.
+ *
+ * <p>This class uses {@link TrackingUpdateProcessorFactory} instances (configured both before, and
+ * after the <code>distrib</code> processor) to inspect which replicas receive various {@link
+ * UpdateRequest}s from variously configured {@link CloudSolrClient}s. In some requests, <code>
+ * shards.preference=replica.type:PULL</code> is specified to confirm that typical routing
+ * prefrences are respected (when the effective value of <code>isSendToLeaders</code> is <code>false
+ * </code>)
+ */
+public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String CONFIG = "tracking-updates";
+  private static final String COLLECTION_NAME = "the_collection";
+
+  private static final Set<String> LEADER_CORE_NAMES = new HashSet<>();
+  private static final Set<String> PULL_REPLICA_CORE_NAMES = new HashSet<>();
+
+  @AfterClass
+  public static void cleanupExpectedCoreNames() throws Exception {
+    LEADER_CORE_NAMES.clear();
+    PULL_REPLICA_CORE_NAMES.clear();
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    assert LEADER_CORE_NAMES.isEmpty();
+    assert PULL_REPLICA_CORE_NAMES.isEmpty();
+
+    final int numNodes = 4;
+    configureCluster(numNodes)
+        .addConfig(
+            CONFIG,
+            getFile("solrj")
+                .toPath()
+                .resolve("solr")
+                .resolve("configsets")
+                .resolve(CONFIG)
+                .resolve("conf"))
+        .configure();
+
+    // create 2 shard collection with 1 NRT (leader) and 1 PULL replica
+    assertTrue(
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG, 2, 1)
+            .setPullReplicas(1)
+            .setNrtReplicas(1)
+            .process(cluster.getSolrClient())
+            .isSuccess());
+
+    final List<Replica> allReplicas =
+        cluster.getSolrClient().getClusterState().getCollection(COLLECTION_NAME).getReplicas();
+    assertEquals(
+        "test preconditions were broken, each replica should have it's own node",
+        numNodes,
+        allReplicas.size());
+
+    allReplicas.stream()
+        .filter(Replica::isLeader)
+        .map(Replica::getCoreName)
+        .collect(Collectors.toCollection(() -> LEADER_CORE_NAMES));
+
+    allReplicas.stream()
+        .filter(r -> Replica.Type.PULL.equals(r.getType()))
+        .map(Replica::getCoreName)
+        .collect(Collectors.toCollection(() -> PULL_REPLICA_CORE_NAMES));
+
+    log.info("Leader coreNames={}", LEADER_CORE_NAMES);
+    log.info("PULL Replica coreNames={}", PULL_REPLICA_CORE_NAMES);
+  }
+
+  /**
+   * Helper that stops recording and returns an unmodifiable list of the core names from each
+   * recorded command
+   */
+  private static List<String> stopRecording(final String group) {
+    return TrackingUpdateProcessorFactory.stopRecording(group).stream()
+        .map(
+            uc ->
+                uc.getReq()
+                    .getContext()
+                    .get(TrackingUpdateProcessorFactory.REQUEST_NODE)
+                    .toString())
+        .collect(Collectors.toUnmodifiableList());
+  }
+
+  /** Convinience class for making assertions about the updates that were processed */
+  private static class RecordingResults {
+    public final List<UpdateCommand> preDistribCommands;
+    public final List<UpdateCommand> postDistribCommands;
+
+    public final Map<SolrQueryRequest, List<UpdateCommand>> preDistribRequests;
+    public final Map<SolrQueryRequest, List<UpdateCommand>> postDistribRequests;
+
+    public final Map<String, List<SolrQueryRequest>> preDistribCores;
+    public final Map<String, List<SolrQueryRequest>> postDistribCores;
+
+    private static Map<SolrQueryRequest, List<UpdateCommand>> mapReqsToCommands(
+        final List<UpdateCommand> commands) {
+      return commands.stream().collect(Collectors.groupingBy(UpdateCommand::getReq));
+    }
+
+    private static Map<String, List<SolrQueryRequest>> mapCoresToReqs(
+        final Collection<SolrQueryRequest> reqs) {
+      return reqs.stream()
+          .collect(
+              Collectors.groupingBy(
+                  r -> r.getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE).toString()));
+    }
+
+    public RecordingResults(
+        final List<UpdateCommand> preDistribCommands,
+        final List<UpdateCommand> postDistribCommands) {
+      this.preDistribCommands = preDistribCommands;
+      this.postDistribCommands = postDistribCommands;
+
+      this.preDistribRequests = mapReqsToCommands(preDistribCommands);
+      this.postDistribRequests = mapReqsToCommands(postDistribCommands);
+
+      this.preDistribCores = mapCoresToReqs(preDistribRequests.keySet());
+      this.postDistribCores = mapCoresToReqs(postDistribRequests.keySet());
+    }
+  }
+
+  /**
+   * Given an {@link AbstractUpdateRequest} and a {@link SolrClient}, processes that request against
+   * that client while {@link TrackingUpdateProcessorFactory} is recording, does some basic
+   * validation, then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code>
+   * coreNames to the specified validators
+   */
+  private static RecordingResults assertUpdateWithRecording(
+      final AbstractUpdateRequest req, final SolrClient client) throws Exception {
+
+    TrackingUpdateProcessorFactory.startRecording("pre-distrib");
+    TrackingUpdateProcessorFactory.startRecording("post-distrib");
+
+    assertEquals(0, req.process(client, COLLECTION_NAME).getStatus());
+
+    final RecordingResults results =
+        new RecordingResults(
+            TrackingUpdateProcessorFactory.stopRecording("pre-distrib"),
+            TrackingUpdateProcessorFactory.stopRecording("post-distrib"));
+
+    // post-distrib should never match any PULL replicas, regardless of request, if this fails
+    // something is seriously wrong with our cluster
+    MatcherAssert.assertThat(
+        "post-distrib should never be PULL replica",
+        results.postDistribCores.keySet(),
+        everyItem(not(isIn(PULL_REPLICA_CORE_NAMES))));
+
+    return results;
+  }
+
+  /**
+   * Since {@link AbstractUpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for
+   * setting <code>shards.preference=replica.type:PULL</code> on the input req, and then returning
+   * that req
+   */
+  private static AbstractUpdateRequest prefPull(final AbstractUpdateRequest req) {
+    req.setParam("shards.preference", "replica.type:PULL");
+    return req;
+  }
+
+  public void testBuilderImplicitBehavior() throws Exception {
+    try (CloudSolrClient client =
+        new CloudLegacySolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .build()) {
+      assertTrue(client.isUpdatesToLeaders());
+    }
+    try (CloudSolrClient client =
+        new CloudHttp2SolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .build()) {
+      assertTrue(client.isUpdatesToLeaders());
+    }
+  }
+
+  public void testLegacyClientThatDefaultsToLeaders() throws Exception {
+    try (CloudSolrClient client =
+        new CloudLegacySolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesOnlyToShardLeaders()
+            .build()) {
+      checkUpdatesDefaultToLeaders(client);
+      checkUpdatesWithSendToLeadersFalse(client);
+    }
+  }
+
+  public void testLegacyClientThatDoesNotDefaultToLeaders() throws Exception {
+    try (CloudSolrClient client =
+        new CloudLegacySolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesToAnyReplica()
+            .build()) {
+      checkUpdatesWithShardsPrefPull(client);
+      checkUpdatesWithSendToLeadersFalse(client);
+    }
+  }
+
+  public void testHttp2ClientThatDefaultsToLeaders() throws Exception {
+    try (CloudSolrClient client =
+        new CloudHttp2SolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesOnlyToShardLeaders()
+            .build()) {
+      checkUpdatesDefaultToLeaders(client);
+      checkUpdatesWithSendToLeadersFalse(client);
+    }
+  }
+
+  public void testHttp2ClientThatDoesNotDefaultToLeaders() throws Exception {
+    try (CloudSolrClient client =
+        new CloudHttp2SolrClient.Builder(
+                Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+            .sendUpdatesToAnyReplica()
+            .build()) {
+      checkUpdatesWithShardsPrefPull(client);
+      checkUpdatesWithSendToLeadersFalse(client);
+    }
+  }
+
+  /**
+   * Given a SolrClient, sends various updates and asserts expecations regarding default behavior:
+   * that these requests will be initially sent to shard leaders, and "routed" requests will be sent
+   * to the leader for that route's shard
+   */
+  private void checkUpdatesDefaultToLeaders(final CloudSolrClient client) throws Exception {
+    assertTrue(
+        "broken test, only valid on clients where updatesToLeaders=true",
+        client.isUpdatesToLeaders());
+
+    { // single doc add is routable and should go to a single shard
+      final RecordingResults add =
+          assertUpdateWithRecording(new UpdateRequest().add(sdoc("id", "hoss")), client);
+
+      // single NRT leader is only core that should be involved at all
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+      MatcherAssert.assertThat(
+          "add pre-distrib must be leader",
+          add.preDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      assertEquals(
+          "add pre and post should match",
+          add.preDistribCores.keySet(),
+          add.postDistribCores.keySet());
+      assertEquals(
+          "add pre and post should be exact same reqs",
+          add.preDistribRequests.keySet(),
+          add.postDistribRequests.keySet());
+      // NOTE: we can't assert the pre/post commands are the same, because they add versioning
+
+      // whatever leader our add was routed to, a DBI for the same id should go to the same leader
+      final RecordingResults del =
+          assertUpdateWithRecording(new UpdateRequest().deleteById("hoss"), client);
+      assertEquals(
+          "del pre and post should match",
+          del.preDistribCores.keySet(),
+          del.postDistribCores.keySet());
+      assertEquals(
+          "add and del should have been routed the same",
+          add.preDistribCores.keySet(),
+          del.preDistribCores.keySet());
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+    }
+
+    { // DBQ should start on some leader, and then distrib to both leaders
+      final RecordingResults record =
+          assertUpdateWithRecording(new UpdateRequest().deleteByQuery("*:*"), client);
+
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib must be leader",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "dbq post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "dbq post-distrib size",
+          record.postDistribRequests.keySet(),
+          hasSize(LEADER_CORE_NAMES.size()));
+      MatcherAssert.assertThat(
+          "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+    }
+
+    { // When we have multiple direct updates for different shards, client will
+      // split them and merge the responses.
+      //
+      // But we should still only see at most one pre request per shard leader
+
+      final RecordingResults record =
+          assertUpdateWithRecording(prefPull(createMultiDirectUpdates(100, 10)), client);
+
+      // NOTE: Don't assume our docIds are spread across multi-shards...
+      // ...but the original number of requests should all be diff leaders
+      MatcherAssert.assertThat(
+          "multi pre-distrib must be leaders",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "multi pre-distrib req != pre-distrib num cores",
+          record.preDistribRequests.keySet(),
+          hasSize(record.preDistribCores.keySet().size()));
+      MatcherAssert.assertThat(
+          "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+      assertEquals(
+          "multi post-distrib must be same leaders",
+          record.preDistribCores.keySet(),
+          record.postDistribCores.keySet());
+
+      // NOTE: we make no asertion about number of post-distrb requests, just commands
+      // (distrib proc may batch differently then what we send)
+      assertEquals(
+          "multi post-distrib cores don't match pre-distrib cores",
+          record.preDistribCores.keySet(),
+          record.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+    }
+  }
+
+  /**
+   * Given a SolrClient, sends various updates using {@link #prefPull} and asserts expecations that
+   * these requests will be initially sent to PULL replcias
+   */
+  private void checkUpdatesWithShardsPrefPull(final CloudSolrClient client) throws Exception {
+
+    assertFalse(
+        "broken test, only valid on clients where updatesToLeaders=false",
+        client.isUpdatesToLeaders());
+
+    { // single doc add...
+      final RecordingResults add =
+          assertUpdateWithRecording(prefPull(new UpdateRequest().add(sdoc("id", "hoss"))), client);
+
+      // ...should start on (some) PULL replica, since we asked nicely
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "add pre-distrib must be PULL",
+          add.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+      // ...then be routed to single leader for this id
+      MatcherAssert.assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "add post-distrib must be leader",
+          add.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+      // A DBI should also start on (some) PULL replica,  since we asked nicely.
+      //
+      // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+      final RecordingResults del =
+          assertUpdateWithRecording(prefPull(new UpdateRequest().deleteById("hoss")), client);
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "del pre-distrib must be PULL",
+          del.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "add and del should have same post-distrib leader",
+          add.postDistribCores.keySet(),
+          del.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+    }
+
+    { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+      final RecordingResults record =
+          assertUpdateWithRecording(prefPull(new UpdateRequest().deleteByQuery("*:*")), client);
+
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "dbq post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "dbq post-distrib size",
+          record.postDistribRequests.keySet(),
+          hasSize(LEADER_CORE_NAMES.size()));
+      MatcherAssert.assertThat(
+          "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+    }
+
+    { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+      // should still only go to one replica for all the "pre" commands, then be forwarded
+      // the respective leaders for the "post" commands
+
+      final RecordingResults record =
+          assertUpdateWithRecording(prefPull(createMultiDirectUpdates(100, 10)), client);
+
+      MatcherAssert.assertThat(
+          "multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "multi pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+      assertEquals(
+          "multi post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      // NOTE: Don't assume our docIds are spread across multi-shards...
+      //
+      // We make no asertion about number of post-distrb requests
+      // (distrib proc may batch differently then what we send)
+      MatcherAssert.assertThat(
+          "multi post-distrib cores",
+          record.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+    }
+  }
+
+  /**
+   * Given a SolrClient, sends various updates were {@link IsUpdateRequest#isSendToLeaders} returns
+   * false, and asserts expectations that requess using {@link #prefPull} are all sent to PULL
+   * replicas, regardless of how the client is configured.
+   */
+  private void checkUpdatesWithSendToLeadersFalse(final CloudSolrClient client) throws Exception {
+    { // single doc add...
+      final RecordingResults add =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().add(sdoc("id", "hoss"))).setSendToLeaders(false),
+              client);
+
+      // ...should start on (some) PULL replica, since we asked nicely
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "add pre-distrib must be PULL",
+          add.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+      // ...then be routed to single leader for this id
+      MatcherAssert.assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "add post-distrib must be leader",
+          add.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+      // A DBI should also start on (some) PULL replica,  since we asked nicely.
+      //
+      // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+      final RecordingResults del =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().deleteById("hoss")).setSendToLeaders(false), client);
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "del pre-distrib must be PULL",
+          del.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "add and del should have same post-distrib leader",
+          add.postDistribCores.keySet(),
+          del.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+    }
+
+    { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+      final RecordingResults record =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().deleteByQuery("*:*")).setSendToLeaders(false), client);
+
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "dbq post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      MatcherAssert.assertThat(
+          "dbq post-distrib size",
+          record.postDistribRequests.keySet(),
+          hasSize(LEADER_CORE_NAMES.size()));
+      MatcherAssert.assertThat(
+          "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+    }
+
+    { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+      // should still only go to one replica for all the "pre" commands, then be forwarded
+      // the respective leaders for the "post" commands
+
+      final RecordingResults record =
+          assertUpdateWithRecording(
+              prefPull(createMultiDirectUpdates(100, 10)).setSendToLeaders(false), client);
+
+      MatcherAssert.assertThat(
+          "multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "multi pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+      MatcherAssert.assertThat(
+          "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+      assertEquals(
+          "multi post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      // NOTE: Don't assume our docIds are spread across multi-shards...
+      //
+      // We make no asertion about number of post-distrb requests
+      // (distrib proc may batch differently then what we send)
+      MatcherAssert.assertThat(
+          "multi post-distrib cores",
+          record.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      MatcherAssert.assertThat(
+          "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+    }
+  }
+
+  private static UpdateRequest createMultiDirectUpdates(final int numAdds, final int numDel) {
+    final UpdateRequest req = new UpdateRequest();
+    for (int i = 0; i < numAdds; i++) {
+      req.add(sdoc("id", "add" + i));
+    }
+    for (int i = 0; i < numDel; i++) {
+      req.deleteById("del" + i);
+    }
+    return req;
+  }
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index f05af27d6f7..6ae2eb76a2e 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -2704,7 +2704,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     }
     return new RandomizingCloudSolrClientBuilder(
             Collections.singletonList(zkHost), Optional.empty())
-        .sendUpdatesToAllReplicasInShard()
+        .sendUpdatesToAnyReplica()
         .build();
   }
 
@@ -2729,7 +2729,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     }
     return new RandomizingCloudSolrClientBuilder(
             Collections.singletonList(zkHost), Optional.empty())
-        .sendUpdatesToAllReplicasInShard()
+        .sendUpdatesToAnyReplica()
         .withSocketTimeout(socketTimeoutMillis)
         .build();
   }
@@ -2754,7 +2754,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     }
     return new RandomizingCloudSolrClientBuilder(
             Collections.singletonList(zkHost), Optional.empty())
-        .sendUpdatesToAllReplicasInShard()
+        .sendUpdatesToAnyReplica()
         .withConnectionTimeout(connectionTimeoutMillis)
         .withSocketTimeout(socketTimeoutMillis)
         .build();
@@ -2777,7 +2777,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     return new RandomizingCloudSolrClientBuilder(
             Collections.singletonList(zkHost), Optional.empty())
         .withHttpClient(httpClient)
-        .sendUpdatesToAllReplicasInShard()
+        .sendUpdatesToAnyReplica()
         .build();
   }
 
@@ -2804,7 +2804,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     return new RandomizingCloudSolrClientBuilder(
             Collections.singletonList(zkHost), Optional.empty())
         .withHttpClient(httpClient)
-        .sendUpdatesToAllReplicasInShard()
+        .sendUpdatesToAnyReplica()
         .withConnectionTimeout(connectionTimeoutMillis)
         .withSocketTimeout(socketTimeoutMillis)
         .build();
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java b/solr/test-framework/src/java/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
similarity index 100%
rename from solr/core/src/test/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
rename to solr/test-framework/src/java/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java b/solr/test-framework/src/java/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
similarity index 100%
rename from solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
rename to solr/test-framework/src/java/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java