You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/01/07 00:54:17 UTC
[solr] 08/12: Add IsUpdateRequest.isSendToLeaders() such that both it and client config must be true for shards.preference to be overridden
This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch jira/SOLR-6312
in repository https://gitbox.apache.org/repos/asf/solr.git
commit 345afd12fa2d03b795bcc6de0ee1639476e2e683
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Thu Jan 5 15:26:46 2023 -0700
Add IsUpdateRequest.isSendToLeaders() such that both it and client config must be true for shards.preference to be overridden
---
.../solr/client/solrj/impl/CloudSolrClient.java | 24 +++-
.../solrj/request/AbstractUpdateRequest.java | 11 ++
.../solr/client/solrj/request/IsUpdateRequest.java | 15 ++-
.../impl/SendUpdatesToLeadersOverrideTest.java | 135 +++++++++++++++++++--
4 files changed, 170 insertions(+), 15 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index c05fcf70b70..900aaa9108a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -1044,9 +1044,7 @@ public abstract class CloudSolrClient extends SolrClient {
boolean sendToLeaders = false;
if (request instanceof IsUpdateRequest) {
- sendToLeaders =
- // nocommit: also check request.isSendToLeaders() (still to be added)
- this.isUpdatesToLeaders();
+ sendToLeaders = ((IsUpdateRequest) request).isSendToLeaders() && this.isUpdatesToLeaders();
// Check if we can do a "directUpdate" ...
if (sendToLeaders && request instanceof UpdateRequest) {
@@ -1206,11 +1204,31 @@ public abstract class CloudSolrClient extends SolrClient {
return uniqueNames;
}
+ /**
+ * If true, this client has been configured such that it will generally prefer to send {@link
+ * IsUpdateRequest} requests to a shard leader, if and only if {@link
+ * IsUpdateRequest#isSendToLeaders} is also true. If false, then this client has been configured
+ * to obey normal routing preferences when dealing with {@link IsUpdateRequest} requests.
+ *
+ * @see #isDirectUpdatesToLeadersOnly
+ */
public boolean isUpdatesToLeaders() {
return updatesToLeaders;
}
/**
+ * If true, this client has been configured such that "direct updates" will <em>only</em> be sent
+ * to the current leader of the corrisponding shard, and will not be retried with other replicas.
+ * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+ * IsUpdateRequest#isSendToLeaders} returns false.
+ *
+ * <p>A "direct update" is any update that can be sent directly to a single shard, and does not
+ * need to be broadcast to every shard. (Example: document updates or "delete by id" when using
+ * the default router; non-direct updates are things like commits and "delete by query").
+ *
+ * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct updates" for different
+ * shards, this client may break the request up and merge th resposes.
+ *
* @return true if direct updates are sent to shard leaders only
*/
public boolean isDirectUpdatesToLeadersOnly() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
index 352fd290f59..733502353dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
@@ -159,4 +159,15 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse>
this.commitWithin = commitWithin;
return this;
}
+
+ private boolean sendToLeaders = true;
+
+ public boolean isSendToLeaders() {
+ return sendToLeaders;
+ }
+
+ public AbstractUpdateRequest setSendToLeaders(final boolean sendToLeaders) {
+ this.sendToLeaders = sendToLeaders;
+ return this;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
index 5e8c8aff854..54f449e1358 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
@@ -16,5 +16,18 @@
*/
package org.apache.solr.client.solrj.request;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
/** Marker class so that we can determine which requests are updates. */
-public interface IsUpdateRequest {}
+public interface IsUpdateRequest {
+
+ /**
+ * Indicates if clients should make attempts to route this request to a shard leader, overriding
+ * typical client routing preferences for requests. Defaults to true.
+ *
+ * @see CloudSolrClient#isUpdatesToLeaders
+ */
+ default boolean isSendToLeaders() {
+ return true;
+ }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
index 0daaa76dcb0..044c24f339d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
@@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
@@ -165,13 +166,13 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
}
/**
- * Given an {@link UpdateRequest} and a {@link SolrClient}, processes that request against that
- * client while {@link TrackingUpdateProcessorFactory} is recording, does some basic validation,
- * then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code> coreNames to
- * the specified validators
+ * Given an {@link AbstractUpdateRequest} and a {@link SolrClient}, processes that request against
+ * that client while {@link TrackingUpdateProcessorFactory} is recording, does some basic
+ * validation, then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code>
+ * coreNames to the specified validators
*/
private static RecordingResults assertUpdateWithRecording(
- final UpdateRequest req, final SolrClient client) throws Exception {
+ final AbstractUpdateRequest req, final SolrClient client) throws Exception {
TrackingUpdateProcessorFactory.startRecording("pre-distrib");
TrackingUpdateProcessorFactory.startRecording("post-distrib");
@@ -194,10 +195,11 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
}
/**
- * Since {@link UpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for setting
- * <code>shards.preference=replica.type:PULL</code> on the input req, and then returning that req
+ * Since {@link AbstractUpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for
+ * setting <code>shards.preference=replica.type:PULL</code> on the input req, and then returning
+ * that req
*/
- private static UpdateRequest prefPull(final UpdateRequest req) {
+ private static AbstractUpdateRequest prefPull(final AbstractUpdateRequest req) {
req.setParam("shards.preference", "replica.type:PULL");
return req;
}
@@ -205,17 +207,18 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
// nocommit: - test CloudHttp2SolrClient as well
// basic sanity check of expected default behavior
- public void testUpdatesDefaultToLeaders() throws Exception {
+ public void testClientThatDefaultsToLeaders() throws Exception {
try (CloudSolrClient client =
new CloudLegacySolrClient.Builder(
Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
.sendUpdatesOnlyToShardLeaders()
.build()) {
checkUpdatesDefaultToLeaders(client);
+ checkUpdatesWithSendToLeadersFalse(client);
}
}
- public void testUpdatesWithShardsPrefPull() throws Exception {
+ public void testClientThatDoesNotDefaultToLeaders() throws Exception {
try (CloudSolrClient client =
new CloudLegacySolrClient.Builder(
Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
@@ -224,6 +227,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
.sendUpdatesToAllReplicasInShard()
.build()) {
checkUpdatesWithShardsPrefPull(client);
+ checkUpdatesWithSendToLeadersFalse(client);
}
}
@@ -334,7 +338,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
}
/**
- * Given a SolrClient, sends various updates using {#link #prefPull} and asserts expecations that
+ * Given a SolrClient, sends various updates using {@link #prefPull} and asserts expecations that
* these requests will be initially sent to PULL replcias
*/
private void checkUpdatesWithShardsPrefPull(final CloudSolrClient client) throws Exception {
@@ -441,6 +445,115 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
}
}
+ /**
+ * Given a SolrClient, sends various updates were {@link IsUpdateRequest#isSendToLeaders} returns
+ * false, and asserts expectations that requess using {@link #prefPull} are all sent to PULL
+ * replicas, regardless of how the client is configured.
+ */
+ private void checkUpdatesWithSendToLeadersFalse(final CloudSolrClient client) throws Exception {
+ { // single doc add...
+ final RecordingResults add =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().add(sdoc("id", "hoss"))).setSendToLeaders(false),
+ client);
+
+ // ...should start on (some) PULL replica, since we asked nicely
+ assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+ assertThat(
+ "add pre-distrib must be PULL",
+ add.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+ assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+ // ...then be routed to single leader for this id
+ assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+ assertThat(
+ "add post-distrib must be leader",
+ add.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ assertThat("add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+ assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+ // A DBI should also start on (some) PULL replica, since we asked nicely.
+ //
+ // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+ final RecordingResults del =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().deleteById("hoss")).setSendToLeaders(false), client);
+ assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+ assertThat(
+ "del pre-distrib must be PULL",
+ del.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+ assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "add and del should have same post-distrib leader",
+ add.postDistribCores.keySet(),
+ del.postDistribCores.keySet());
+ assertThat("del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+ assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+ }
+
+ { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+ final RecordingResults record =
+ assertUpdateWithRecording(
+ prefPull(new UpdateRequest().deleteByQuery("*:*")).setSendToLeaders(false), client);
+
+ assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ assertThat(
+ "dbq pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ assertThat("dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+ assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+ assertEquals(
+ "dbq post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ assertThat(
+ "dbq post-distrib size",
+ record.postDistribRequests.keySet(),
+ hasSize(LEADER_CORE_NAMES.size()));
+ assertThat(
+ "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+ }
+
+ { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+ // should still only go to one replica for all the "pre" commands, then be forwarded
+ // the respective leaders for the "post" commands
+
+ final RecordingResults record =
+ assertUpdateWithRecording(
+ prefPull(createMultiDirectUpdates(100, 10)).setSendToLeaders(false), client);
+
+ assertThat("multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+ assertThat(
+ "multi pre-distrib must be PULL",
+ record.preDistribCores.keySet(),
+ everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+ assertThat("multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+ assertThat("multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+ assertEquals(
+ "multi post-distrib must be all leaders",
+ LEADER_CORE_NAMES,
+ record.postDistribCores.keySet());
+ // NOTE: Don't assume our docIds are spread across multi-shards...
+ //
+ // We make no asertion about number of post-distrb requests
+ // (distrib proc may batch differently then what we send)
+ assertThat(
+ "multi post-distrib cores",
+ record.postDistribCores.keySet(),
+ everyItem(isIn(LEADER_CORE_NAMES)));
+ assertThat("multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+ }
+ }
+
private static UpdateRequest createMultiDirectUpdates(final int numAdds, final int numDel) {
final UpdateRequest req = new UpdateRequest();
for (int i = 0; i < numAdds; i++) {