You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/01/20 19:30:46 UTC
[solr] branch main updated: SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.
This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new afb980bc217 SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.
afb980bc217 is described below
commit afb980bc217cb492b24e002c99695e28794aceb9
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Fri Jan 20 12:30:33 2023 -0700
SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false.
This behavior can also be configured per AbstractUpdateRequest instance.
---
solr/CHANGES.txt | 4 +
.../client/solrj/impl/CloudHttp2SolrClient.java | 30 +
.../client/solrj/impl/CloudLegacySolrClient.java | 40 +-
.../solr/client/solrj/impl/CloudSolrClient.java | 35 +-
.../solrj/request/AbstractUpdateRequest.java | 11 +
.../solr/client/solrj/request/IsUpdateRequest.java | 15 +-
.../configsets/tracking-updates/conf/schema.xml | 29 +
.../tracking-updates/conf/solrconfig.xml | 63 +++
.../solrj/impl/CloudHttp2SolrClientTest.java | 1 +
.../client/solrj/impl/CloudSolrClientTest.java | 1 +
.../impl/SendUpdatesToLeadersOverrideTest.java | 624 +++++++++++++++++++++
.../src/java/org/apache/solr/SolrTestCaseJ4.java | 10 +-
.../processor/RecordingUpdateProcessorFactory.java | 0
.../processor/TrackingUpdateProcessorFactory.java | 0
14 files changed, 844 insertions(+), 19 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 16b26894cc0..9343a82fe57 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -123,6 +123,10 @@ Improvements
* SOLR-15772: More visible security warnings in Admin UI (janhoy)
+* SOLR-6312: SolrJ Cloud clients now correctly support isUpdatesToLeaders() being false. This behavior can also be
+ configured per AbstractUpdateRequest instance. (hossman)
+
+
Optimizations
---------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index fa602215d26..89a08280535 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -206,12 +206,39 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
}
+ /**
+ * Tells {@link Builder} that created clients should be configured such that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
+ *
+ * @see #sendUpdatesToAnyReplica
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ public Builder sendUpdatesOnlyToShardLeaders() {
+ shardLeadersOnly = true;
+ return this;
+ }
+
+ /**
+ * Tells {@link Builder} that created clients should be configured such that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
+ *
+ * @see #sendUpdatesOnlyToShardLeaders
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ public Builder sendUpdatesToAnyReplica() {
+ shardLeadersOnly = false;
+ return this;
+ }
+
/**
* Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to
* shard leaders only.
*
* <p>UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a
* {@link SolrException}
+ *
+ * @see #sendDirectUpdatesToAnyShardReplica
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
*/
public Builder sendDirectUpdatesToShardLeadersOnly() {
directUpdatesToLeadersOnly = true;
@@ -224,6 +251,9 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
*
* <p>Shard leaders are still preferred, but the created clients will fallback to using other
* replicas if a leader cannot be found.
+ *
+ * @see #sendDirectUpdatesToShardLeadersOnly
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
*/
public Builder sendDirectUpdatesToAnyShardReplica() {
directUpdatesToLeadersOnly = false;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
index 983d959434c..eac2326a330 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudLegacySolrClient.java
@@ -283,9 +283,11 @@ public class CloudLegacySolrClient extends CloudSolrClient {
}
/**
- * Tells {@link Builder} that created clients should send updates only to shard leaders.
+ * Tells {@link Builder} that created clients should be configured such that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
*
- * <p>WARNING: This method currently has no effect. See SOLR-6312 for more information.
+ * @see #sendUpdatesToAnyReplica
+ * @see CloudSolrClient#isUpdatesToLeaders
*/
public Builder sendUpdatesOnlyToShardLeaders() {
shardLeadersOnly = true;
@@ -293,20 +295,45 @@ public class CloudLegacySolrClient extends CloudSolrClient {
}
/**
- * Tells {@link Builder} that created clients should send updates to all replicas for a shard.
+ * Tells {@link Builder} that created clients should be configured such that {@link
+ * CloudSolrClient#isUpdatesToLeaders} returns <code>false</code>.
*
- * <p>WARNING: This method currently has no effect. See SOLR-6312 for more information.
+ * @see #sendUpdatesOnlyToShardLeaders
+ * @see CloudSolrClient#isUpdatesToLeaders
*/
- public Builder sendUpdatesToAllReplicasInShard() {
+ public Builder sendUpdatesToAnyReplica() {
shardLeadersOnly = false;
return this;
}
+ /**
+ * This method has no effect.
+ *
+ * <p>In older versions of Solr, this method was an incorrectly named equivilent to {@link
+ * #sendUpdatesToAnyReplica}, which had no effect because that setting was ignored in the
+ * created clients. When the underlying {@link CloudSolrClient} behavior was fixed, this method
+ * was modified to be an explicit No-Op, since the implied behavior of sending updates to
+ * <em>all</em> replicas has never been supported, and was never intended to be supported.
+ *
+ * @see #sendUpdatesOnlyToShardLeaders
+ * @see #sendUpdatesToAnyReplica
+ * @see CloudSolrClient#isUpdatesToLeaders
+ * @see <a href="https://issues.apache.org/jira/browse/SOLR-6312">SOLR-6312</a>
+ * @deprecated Never supported
+ */
+ @Deprecated
+ public Builder sendUpdatesToAllReplicasInShard() {
+ return this;
+ }
+
/**
* Tells {@link Builder} that created clients should send direct updates to shard leaders only.
*
* <p>UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a
* {@link SolrException}
+ *
+ * @see #sendDirectUpdatesToAnyShardReplica
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
*/
public Builder sendDirectUpdatesToShardLeadersOnly() {
directUpdatesToLeadersOnly = true;
@@ -319,6 +346,9 @@ public class CloudLegacySolrClient extends CloudSolrClient {
*
* <p>Shard leaders are still preferred, but the created clients will fallback to using other
* replicas if a leader cannot be found.
+ *
+ * @see #sendDirectUpdatesToShardLeadersOnly
+ * @see CloudSolrClient#isDirectUpdatesToLeadersOnly
*/
public Builder sendDirectUpdatesToAnyShardReplica() {
directUpdatesToLeadersOnly = false;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 9c1c1db83ed..5a146e70a81 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -847,11 +847,10 @@ public abstract class CloudSolrClient extends SolrClient {
isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
}
boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
- boolean isUpdate = (request instanceof IsUpdateRequest) && (request instanceof UpdateRequest);
if (!inputCollections.isEmpty()
&& !isAdmin
&& !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
- Set<String> requestedCollectionNames = resolveAliases(inputCollections, isUpdate);
+ Set<String> requestedCollectionNames = resolveAliases(inputCollections);
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
@@ -1048,11 +1047,12 @@ public abstract class CloudSolrClient extends SolrClient {
connect();
boolean sendToLeaders = false;
- boolean isUpdate = false;
if (request instanceof IsUpdateRequest) {
- if (request instanceof UpdateRequest) {
- isUpdate = true;
+ sendToLeaders = ((IsUpdateRequest) request).isSendToLeaders() && this.isUpdatesToLeaders();
+
+ // Check if we can do a "directUpdate" ...
+ if (sendToLeaders && request instanceof UpdateRequest) {
if (inputCollections.size() > 1) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
@@ -1069,7 +1069,6 @@ public abstract class CloudSolrClient extends SolrClient {
return response;
}
}
- sendToLeaders = true;
}
SolrParams reqParams = request.getParams();
@@ -1099,7 +1098,7 @@ public abstract class CloudSolrClient extends SolrClient {
}
} else { // Typical...
- Set<String> collectionNames = resolveAliases(inputCollections, isUpdate);
+ Set<String> collectionNames = resolveAliases(inputCollections);
if (collectionNames.isEmpty()) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
@@ -1194,7 +1193,7 @@ public abstract class CloudSolrClient extends SolrClient {
* Resolves the input collections to their possible aliased collections. Doesn't validate
* collection existence.
*/
- private Set<String> resolveAliases(List<String> inputCollections, boolean isUpdate) {
+ private Set<String> resolveAliases(List<String> inputCollections) {
if (inputCollections.isEmpty()) {
return Collections.emptySet();
}
@@ -1210,11 +1209,31 @@ public abstract class CloudSolrClient extends SolrClient {
return uniqueNames;
}
+ /**
+ * If true, this client has been configured such that it will generally prefer to send {@link
+ * IsUpdateRequest} requests to a shard leader, if and only if {@link
+ * IsUpdateRequest#isSendToLeaders} is also true. If false, then this client has been configured
+ * to obey normal routing preferences when dealing with {@link IsUpdateRequest} requests.
+ *
+ * @see #isDirectUpdatesToLeadersOnly
+ */
public boolean isUpdatesToLeaders() {
return updatesToLeaders;
}
/**
+ * If true, this client has been configured such that "direct updates" will <em>only</em> be sent
+ * to the current leader of the corrisponding shard, and will not be retried with other replicas.
+ * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+ * IsUpdateRequest#isSendToLeaders} returns false.
+ *
+ * <p>A "direct update" is any update that can be sent directly to a single shard, and does not
+ * need to be broadcast to every shard. (Example: document updates or "delete by id" when using
+ * the default router; non-direct updates are things like commits and "delete by query").
+ *
+ * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct updates" for different
+ * shards, this client may break the request up and merge th resposes.
+ *
* @return true if direct updates are sent to shard leaders only
*/
public boolean isDirectUpdatesToLeadersOnly() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
index 352fd290f59..733502353dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
@@ -159,4 +159,15 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse>
this.commitWithin = commitWithin;
return this;
}
+
+ private boolean sendToLeaders = true;
+
+ public boolean isSendToLeaders() {
+ return sendToLeaders;
+ }
+
+ public AbstractUpdateRequest setSendToLeaders(final boolean sendToLeaders) {
+ this.sendToLeaders = sendToLeaders;
+ return this;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
index 5e8c8aff854..54f449e1358 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
@@ -16,5 +16,18 @@
*/
package org.apache.solr.client.solrj.request;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
/** Marker class so that we can determine which requests are updates. */
-public interface IsUpdateRequest {}
+public interface IsUpdateRequest {
+
+ /**
+ * Indicates if clients should make attempts to route this request to a shard leader, overriding
+ * typical client routing preferences for requests. Defaults to true.
+ *
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ default boolean isSendToLeaders() {
+ return true;
+ }
+}
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml
new file mode 100644
index 00000000000..4124feab0c3
--- /dev/null
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="int" class="${solr.tests.IntegerFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+ <fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="true" />
+ <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml
new file mode 100644
index 00000000000..d6df57ba134
--- /dev/null
+++ b/solr/solrj/src/test-files/solrj/solr/configsets/tracking-updates/conf/solrconfig.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with tracking enabled on the URP, both before and after distrib -->
+
+<config>
+
+ <updateRequestProcessorChain default="true">
+ <processor class="solr.LogUpdateProcessorFactory" />
+ <processor class="solr.TrackingUpdateProcessorFactory">
+ <str name="group">pre-distrib</str>
+ </processor>
+ <processor class="solr.DistributedUpdateProcessorFactory" />
+ <processor class="solr.TrackingUpdateProcessorFactory">
+ <str name="group">post-distrib</str>
+ </processor>
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+ <indexConfig>
+ <mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
+ </indexConfig>
+</config>
+
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index f994a216c6b..92409833464 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -304,6 +304,7 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
try (CloudSolrClient threadedClient =
new RandomizingCloudSolrClientBuilder(
Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesOnlyToShardLeaders()
.withParallelUpdates(true)
.build()) {
threadedClient.setDefaultCollection("routing_collection");
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 61d2a60aaec..125c8a3580d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -305,6 +305,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
try (CloudSolrClient threadedClient =
new RandomizingCloudSolrClientBuilder(
Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesOnlyToShardLeaders()
.withParallelUpdates(true)
.build()) {
threadedClient.setDefaultCollection("routing_collection");
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
new file mode 100644
index 00000000000..12009c6592f
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.not;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.update.processor.TrackingUpdateProcessorFactory;
+import org.hamcrest.MatcherAssert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the behavior of {@link CloudSolrClient#isUpdatesToLeaders} and {@link
+ * IsUpdateRequest#isSendToLeaders}.
+ *
+ * <p>This class uses {@link TrackingUpdateProcessorFactory} instances (configured both before, and
+ * after the <code>distrib</code> processor) to inspect which replicas receive various {@link
+ * UpdateRequest}s from variously configured {@link CloudSolrClient}s. In some requests, <code>
+ * shards.preference=replica.type:PULL</code> is specified to confirm that typical routing
+ * prefrences are respected (when the effective value of <code>isSendToLeaders</code> is <code>false
+ * </code>)
+ */
+public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String CONFIG = "tracking-updates";
+ private static final String COLLECTION_NAME = "the_collection";
+
+ private static final Set<String> LEADER_CORE_NAMES = new HashSet<>();
+ private static final Set<String> PULL_REPLICA_CORE_NAMES = new HashSet<>();
+
+ @AfterClass
+ public static void cleanupExpectedCoreNames() throws Exception {
+ LEADER_CORE_NAMES.clear();
+ PULL_REPLICA_CORE_NAMES.clear();
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ assert LEADER_CORE_NAMES.isEmpty();
+ assert PULL_REPLICA_CORE_NAMES.isEmpty();
+
+ final int numNodes = 4;
+ configureCluster(numNodes)
+ .addConfig(
+ CONFIG,
+ getFile("solrj")
+ .toPath()
+ .resolve("solr")
+ .resolve("configsets")
+ .resolve(CONFIG)
+ .resolve("conf"))
+ .configure();
+
+ // create 2 shard collection with 1 NRT (leader) and 1 PULL replica
+ assertTrue(
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, CONFIG, 2, 1)
+ .setPullReplicas(1)
+ .setNrtReplicas(1)
+ .process(cluster.getSolrClient())
+ .isSuccess());
+
+ final List<Replica> allReplicas =
+ cluster.getSolrClient().getClusterState().getCollection(COLLECTION_NAME).getReplicas();
+ assertEquals(
+ "test preconditions were broken, each replica should have it's own node",
+ numNodes,
+ allReplicas.size());
+
+ allReplicas.stream()
+ .filter(Replica::isLeader)
+ .map(Replica::getCoreName)
+ .collect(Collectors.toCollection(() -> LEADER_CORE_NAMES));
+
+ allReplicas.stream()
+ .filter(r -> Replica.Type.PULL.equals(r.getType()))
+ .map(Replica::getCoreName)
+ .collect(Collectors.toCollection(() -> PULL_REPLICA_CORE_NAMES));
+
+ log.info("Leader coreNames={}", LEADER_CORE_NAMES);
+ log.info("PULL Replica coreNames={}", PULL_REPLICA_CORE_NAMES);
+ }
+
+ /**
+ * Helper that stops recording and returns an unmodifiable list of the core names from each
+ * recorded command
+ */
+ private static List<String> stopRecording(final String group) {
+ return TrackingUpdateProcessorFactory.stopRecording(group).stream()
+ .map(
+ uc ->
+ uc.getReq()
+ .getContext()
+ .get(TrackingUpdateProcessorFactory.REQUEST_NODE)
+ .toString())
+ .collect(Collectors.toUnmodifiableList());
+ }
+
+ /** Convinience class for making assertions about the updates that were processed */
+ private static class RecordingResults {
+ public final List<UpdateCommand> preDistribCommands;
+ public final List<UpdateCommand> postDistribCommands;
+
+ public final Map<SolrQueryRequest, List<UpdateCommand>> preDistribRequests;
+ public final Map<SolrQueryRequest, List<UpdateCommand>> postDistribRequests;
+
+ public final Map<String, List<SolrQueryRequest>> preDistribCores;
+ public final Map<String, List<SolrQueryRequest>> postDistribCores;
+
+ private static Map<SolrQueryRequest, List<UpdateCommand>> mapReqsToCommands(
+ final List<UpdateCommand> commands) {
+ return commands.stream().collect(Collectors.groupingBy(UpdateCommand::getReq));
+ }
+
+ private static Map<String, List<SolrQueryRequest>> mapCoresToReqs(
+ final Collection<SolrQueryRequest> reqs) {
+ return reqs.stream()
+ .collect(
+ Collectors.groupingBy(
+ r -> r.getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE).toString()));
+ }
+
+ public RecordingResults(
+ final List<UpdateCommand> preDistribCommands,
+ final List<UpdateCommand> postDistribCommands) {
+ this.preDistribCommands = preDistribCommands;
+ this.postDistribCommands = postDistribCommands;
+
+ this.preDistribRequests = mapReqsToCommands(preDistribCommands);
+ this.postDistribRequests = mapReqsToCommands(postDistribCommands);
+
+ this.preDistribCores = mapCoresToReqs(preDistribRequests.keySet());
+ this.postDistribCores = mapCoresToReqs(postDistribRequests.keySet());
+ }
+ }
+
+ /**
+ * Given an {@link AbstractUpdateRequest} and a {@link SolrClient}, processes that request against
+ * that client while {@link TrackingUpdateProcessorFactory} is recording, does some basic
+ * validation, then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code>
+ * coreNames to the specified validators
+ */
+ private static RecordingResults assertUpdateWithRecording(
+ final AbstractUpdateRequest req, final SolrClient client) throws Exception {
+
+ TrackingUpdateProcessorFactory.startRecording("pre-distrib");
+ TrackingUpdateProcessorFactory.startRecording("post-distrib");
+
+ assertEquals(0, req.process(client, COLLECTION_NAME).getStatus());
+
+ final RecordingResults results =
+ new RecordingResults(
+ TrackingUpdateProcessorFactory.stopRecording("pre-distrib"),
+ TrackingUpdateProcessorFactory.stopRecording("post-distrib"));
+
+ // post-distrib should never match any PULL replicas, regardless of request, if this fails
+ // something is seriously wrong with our cluster
+ MatcherAssert.assertThat(
+ "post-distrib should never be PULL replica",
+ results.postDistribCores.keySet(),
+ everyItem(not(isIn(PULL_REPLICA_CORE_NAMES))));
+
+ return results;
+ }
+
+ /**
+ * Since {@link AbstractUpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for
+ * setting <code>shards.preference=replica.type:PULL</code> on the input req, and then returning
+ * that req
+ */
+ private static AbstractUpdateRequest prefPull(final AbstractUpdateRequest req) {
+ req.setParam("shards.preference", "replica.type:PULL");
+ return req;
+ }
+
+ public void testBuilderImplicitBehavior() throws Exception {
+ try (CloudSolrClient client =
+ new CloudLegacySolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .build()) {
+ assertTrue(client.isUpdatesToLeaders());
+ }
+ try (CloudSolrClient client =
+ new CloudHttp2SolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .build()) {
+ assertTrue(client.isUpdatesToLeaders());
+ }
+ }
+
+ public void testLegacyClientThatDefaultsToLeaders() throws Exception {
+ try (CloudSolrClient client =
+ new CloudLegacySolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesOnlyToShardLeaders()
+ .build()) {
+ checkUpdatesDefaultToLeaders(client);
+ checkUpdatesWithSendToLeadersFalse(client);
+ }
+ }
+
+ public void testLegacyClientThatDoesNotDefaultToLeaders() throws Exception {
+ try (CloudSolrClient client =
+ new CloudLegacySolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesToAnyReplica()
+ .build()) {
+ checkUpdatesWithShardsPrefPull(client);
+ checkUpdatesWithSendToLeadersFalse(client);
+ }
+ }
+
+ public void testHttp2ClientThatDefaultsToLeaders() throws Exception {
+ try (CloudSolrClient client =
+ new CloudHttp2SolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesOnlyToShardLeaders()
+ .build()) {
+ checkUpdatesDefaultToLeaders(client);
+ checkUpdatesWithSendToLeadersFalse(client);
+ }
+ }
+
+ public void testHttp2ClientThatDoesNotDefaultToLeaders() throws Exception {
+ try (CloudSolrClient client =
+ new CloudHttp2SolrClient.Builder(
+ Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+ .sendUpdatesToAnyReplica()
+ .build()) {
+ checkUpdatesWithShardsPrefPull(client);
+ checkUpdatesWithSendToLeadersFalse(client);
+ }
+ }
+
+ /**
+ * Given a SolrClient, sends various updates and asserts expecations regarding default behavior:
+ * that these requests will be initially sent to shard leaders, and "routed" requests will be sent
+ * to the leader for that route's shard
+ */
+ private void checkUpdatesDefaultToLeaders(final CloudSolrClient client) throws Exception {
+ assertTrue(
+ "broken test, only valid on clients where updatesToLeaders=true",
+ client.isUpdatesToLeaders());
+
+ { // single doc add is routable and should go to a single shard
+ final RecordingResults add =
+ assertUpdateWithRecording(new UpdateRequest().add(sdoc("id", "hoss")), client);
+
+ // single NRT leader is only core that should be involved at all
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+ MatcherAssert.assertThat(
+ "add pre-distrib must be leader",
+ add.preDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ assertEquals(
+ "add pre and post should match",
+ add.preDistribCores.keySet(),
+ add.postDistribCores.keySet());
+ assertEquals(
+ "add pre and post should be exact same reqs",
+ add.preDistribRequests.keySet(),
+ add.postDistribRequests.keySet());
+ // NOTE: we can't assert the pre/post commands are the same, because they add versioning
+
+ // whatever leader our add was routed to, a DBI for the same id should go to the same leader
+ final RecordingResults del =
+ assertUpdateWithRecording(new UpdateRequest().deleteById("hoss"), client);
+ assertEquals(
+ "del pre and post should match",
+ del.preDistribCores.keySet(),
+ del.postDistribCores.keySet());
+ assertEquals(
+ "add and del should have been routed the same",
+ add.preDistribCores.keySet(),
+ del.preDistribCores.keySet());
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+ }
+
+ { // DBQ should start on some leader, and then distrib to both leaders
+ final RecordingResults record =
+ assertUpdateWithRecording(new UpdateRequest().deleteByQuery("*:*"), client);
+
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib must be leader",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "dbq post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "dbq post-distrib size",
+ record.postDistribRequests.keySet(),
+ hasSize(LEADER_CORE_NAMES.size()));
+ MatcherAssert.assertThat(
+ "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+ }
+
+ { // When we have multiple direct updates for different shards, client will
+ // split them and merge the responses.
+ //
+ // But we should still only see at most one pre request per shard leader
+
+ final RecordingResults record =
+ assertUpdateWithRecording(prefPull(createMultiDirectUpdates(100, 10)), client);
+
+ // NOTE: Don't assume our docIds are spread across multi-shards...
+ // ...but the original number of requests should all be diff leaders
+ MatcherAssert.assertThat(
+ "multi pre-distrib must be leaders",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "multi pre-distrib req != pre-distrib num cores",
+ record.preDistribRequests.keySet(),
+ hasSize(record.preDistribCores.keySet().size()));
+ MatcherAssert.assertThat(
+ "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+ assertEquals(
+ "multi post-distrib must be same leaders",
+ record.preDistribCores.keySet(),
+ record.postDistribCores.keySet());
+
+ // NOTE: we make no asertion about number of post-distrb requests, just commands
+ // (distrib proc may batch differently then what we send)
+ assertEquals(
+ "multi post-distrib cores don't match pre-distrib cores",
+ record.preDistribCores.keySet(),
+ record.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+ }
+ }
+
+ /**
+ * Given a SolrClient, sends various updates using {@link #prefPull} and asserts expecations that
+ * these requests will be initially sent to PULL replcias
+ */
+ private void checkUpdatesWithShardsPrefPull(final CloudSolrClient client) throws Exception {
+
+ assertFalse(
+ "broken test, only valid on clients where updatesToLeaders=false",
+ client.isUpdatesToLeaders());
+
+ { // single doc add...
+ final RecordingResults add =
+ assertUpdateWithRecording(prefPull(new UpdateRequest().add(sdoc("id", "hoss"))), client);
+
+ // ...should start on (some) PULL replica, since we asked nicely
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "add pre-distrib must be PULL",
+ add.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+ // ...then be routed to single leader for this id
+ MatcherAssert.assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "add post-distrib must be leader",
+ add.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+ // A DBI should also start on (some) PULL replica, since we asked nicely.
+ //
+ // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+ final RecordingResults del =
+ assertUpdateWithRecording(prefPull(new UpdateRequest().deleteById("hoss")), client);
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "del pre-distrib must be PULL",
+ del.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "add and del should have same post-distrib leader",
+ add.postDistribCores.keySet(),
+ del.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+ }
+
+ { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+ final RecordingResults record =
+ assertUpdateWithRecording(prefPull(new UpdateRequest().deleteByQuery("*:*")), client);
+
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "dbq post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "dbq post-distrib size",
+ record.postDistribRequests.keySet(),
+ hasSize(LEADER_CORE_NAMES.size()));
+ MatcherAssert.assertThat(
+ "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+ }
+
+ { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+ // should still only go to one replica for all the "pre" commands, then be forwarded
+ // the respective leaders for the "post" commands
+
+ final RecordingResults record =
+ assertUpdateWithRecording(prefPull(createMultiDirectUpdates(100, 10)), client);
+
+ MatcherAssert.assertThat(
+ "multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "multi pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+ assertEquals(
+ "multi post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ // NOTE: Don't assume our docIds are spread across multi-shards...
+ //
+ // We make no asertion about number of post-distrb requests
+ // (distrib proc may batch differently then what we send)
+ MatcherAssert.assertThat(
+ "multi post-distrib cores",
+ record.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+ }
+ }
+
+ /**
+ * Given a SolrClient, sends various updates were {@link IsUpdateRequest#isSendToLeaders} returns
+ * false, and asserts expectations that requess using {@link #prefPull} are all sent to PULL
+ * replicas, regardless of how the client is configured.
+ */
+ private void checkUpdatesWithSendToLeadersFalse(final CloudSolrClient client) throws Exception {
+ { // single doc add...
+ final RecordingResults add =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().add(sdoc("id", "hoss"))).setSendToLeaders(false),
+ client);
+
+ // ...should start on (some) PULL replica, since we asked nicely
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "add pre-distrib must be PULL",
+ add.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+ // ...then be routed to single leader for this id
+ MatcherAssert.assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "add post-distrib must be leader",
+ add.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+ // A DBI should also start on (some) PULL replica, since we asked nicely.
+ //
+ // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+ final RecordingResults del =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().deleteById("hoss")).setSendToLeaders(false), client);
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "del pre-distrib must be PULL",
+ del.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "add and del should have same post-distrib leader",
+ add.postDistribCores.keySet(),
+ del.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+ }
+
+ { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+ final RecordingResults record =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().deleteByQuery("*:*")).setSendToLeaders(false), client);
+
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "dbq post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ MatcherAssert.assertThat(
+ "dbq post-distrib size",
+ record.postDistribRequests.keySet(),
+ hasSize(LEADER_CORE_NAMES.size()));
+ MatcherAssert.assertThat(
+ "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+ }
+
+ { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+ // should still only go to one replica for all the "pre" commands, then be forwarded
+ // the respective leaders for the "post" commands
+
+ final RecordingResults record =
+ assertUpdateWithRecording(
+ prefPull(createMultiDirectUpdates(100, 10)).setSendToLeaders(false), client);
+
+ MatcherAssert.assertThat(
+ "multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "multi pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+ MatcherAssert.assertThat(
+ "multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+ assertEquals(
+ "multi post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ // NOTE: Don't assume our docIds are spread across multi-shards...
+ //
+ // We make no asertion about number of post-distrb requests
+ // (distrib proc may batch differently then what we send)
+ MatcherAssert.assertThat(
+ "multi post-distrib cores",
+ record.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ MatcherAssert.assertThat(
+ "multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+ }
+ }
+
+ private static UpdateRequest createMultiDirectUpdates(final int numAdds, final int numDel) {
+ final UpdateRequest req = new UpdateRequest();
+ for (int i = 0; i < numAdds; i++) {
+ req.add(sdoc("id", "add" + i));
+ }
+ for (int i = 0; i < numDel; i++) {
+ req.deleteById("del" + i);
+ }
+ return req;
+ }
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index f05af27d6f7..6ae2eb76a2e 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -2704,7 +2704,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
}
return new RandomizingCloudSolrClientBuilder(
Collections.singletonList(zkHost), Optional.empty())
- .sendUpdatesToAllReplicasInShard()
+ .sendUpdatesToAnyReplica()
.build();
}
@@ -2729,7 +2729,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
}
return new RandomizingCloudSolrClientBuilder(
Collections.singletonList(zkHost), Optional.empty())
- .sendUpdatesToAllReplicasInShard()
+ .sendUpdatesToAnyReplica()
.withSocketTimeout(socketTimeoutMillis)
.build();
}
@@ -2754,7 +2754,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
}
return new RandomizingCloudSolrClientBuilder(
Collections.singletonList(zkHost), Optional.empty())
- .sendUpdatesToAllReplicasInShard()
+ .sendUpdatesToAnyReplica()
.withConnectionTimeout(connectionTimeoutMillis)
.withSocketTimeout(socketTimeoutMillis)
.build();
@@ -2777,7 +2777,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
return new RandomizingCloudSolrClientBuilder(
Collections.singletonList(zkHost), Optional.empty())
.withHttpClient(httpClient)
- .sendUpdatesToAllReplicasInShard()
+ .sendUpdatesToAnyReplica()
.build();
}
@@ -2804,7 +2804,7 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
return new RandomizingCloudSolrClientBuilder(
Collections.singletonList(zkHost), Optional.empty())
.withHttpClient(httpClient)
- .sendUpdatesToAllReplicasInShard()
+ .sendUpdatesToAnyReplica()
.withConnectionTimeout(connectionTimeoutMillis)
.withSocketTimeout(socketTimeoutMillis)
.build();
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java b/solr/test-framework/src/java/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
similarity index 100%
rename from solr/core/src/test/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
rename to solr/test-framework/src/java/org/apache/solr/update/processor/RecordingUpdateProcessorFactory.java
diff --git a/solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java b/solr/test-framework/src/java/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
similarity index 100%
rename from solr/core/src/test/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java
rename to solr/test-framework/src/java/org/apache/solr/update/processor/TrackingUpdateProcessorFactory.java