You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/01/07 00:54:17 UTC

[solr] 08/12: Add IsUpdateRequest.isSendToLeaders() such that both it and client config must be true for shards.preference to be overridden

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch jira/SOLR-6312
in repository https://gitbox.apache.org/repos/asf/solr.git

commit 345afd12fa2d03b795bcc6de0ee1639476e2e683
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Thu Jan 5 15:26:46 2023 -0700

    Add IsUpdateRequest.isSendToLeaders() such that both it and client config must be true for shards.preference to be overridden
---
 .../solr/client/solrj/impl/CloudSolrClient.java    |  24 +++-
 .../solrj/request/AbstractUpdateRequest.java       |  11 ++
 .../solr/client/solrj/request/IsUpdateRequest.java |  15 ++-
 .../impl/SendUpdatesToLeadersOverrideTest.java     | 135 +++++++++++++++++++--
 4 files changed, 170 insertions(+), 15 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index c05fcf70b70..900aaa9108a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -1044,9 +1044,7 @@ public abstract class CloudSolrClient extends SolrClient {
     boolean sendToLeaders = false;
 
     if (request instanceof IsUpdateRequest) {
-      sendToLeaders =
-          // nocommit: also check request.isSendToLeaders() (still to be added)
-          this.isUpdatesToLeaders();
+      sendToLeaders = ((IsUpdateRequest) request).isSendToLeaders() && this.isUpdatesToLeaders();
 
       // Check if we can do a "directUpdate" ...
       if (sendToLeaders && request instanceof UpdateRequest) {
@@ -1206,11 +1204,31 @@ public abstract class CloudSolrClient extends SolrClient {
     return uniqueNames;
   }
 
+  /**
+   * If true, this client has been configured such that it will generally prefer to send {@link
+   * IsUpdateRequest} requests to a shard leader, if and only if {@link
+   * IsUpdateRequest#isSendToLeaders} is also true. If false, then this client has been configured
+   * to obey normal routing preferences when dealing with {@link IsUpdateRequest} requests.
+   *
+   * @see #isDirectUpdatesToLeadersOnly
+   */
   public boolean isUpdatesToLeaders() {
     return updatesToLeaders;
   }
 
   /**
+   * If true, this client has been configured such that "direct updates" will <em>only</em> be sent
+   * to the current leader of the corrisponding shard, and will not be retried with other replicas.
+   * This method has no effect if {@link #isUpdatesToLeaders()} or {@link
+   * IsUpdateRequest#isSendToLeaders} returns false.
+   *
+   * <p>A "direct update" is any update that can be sent directly to a single shard, and does not
+   * need to be broadcast to every shard. (Example: document updates or "delete by id" when using
+   * the default router; non-direct updates are things like commits and "delete by query").
+   *
+   * <p>NOTE: If a single {@link UpdateRequest} contains multiple "direct updates" for different
+   * shards, this client may break the request up and merge th resposes.
+   *
    * @return true if direct updates are sent to shard leaders only
    */
   public boolean isDirectUpdatesToLeadersOnly() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
index 352fd290f59..733502353dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java
@@ -159,4 +159,15 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse>
     this.commitWithin = commitWithin;
     return this;
   }
+
+  private boolean sendToLeaders = true;
+
+  public boolean isSendToLeaders() {
+    return sendToLeaders;
+  }
+
+  public AbstractUpdateRequest setSendToLeaders(final boolean sendToLeaders) {
+    this.sendToLeaders = sendToLeaders;
+    return this;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
index 5e8c8aff854..54f449e1358 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/IsUpdateRequest.java
@@ -16,5 +16,18 @@
  */
 package org.apache.solr.client.solrj.request;
 
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+
 /** Marker class so that we can determine which requests are updates. */
-public interface IsUpdateRequest {}
+public interface IsUpdateRequest {
+
+  /**
+   * Indicates if clients should make attempts to route this request to a shard leader, overriding
+   * typical client routing preferences for requests. Defaults to true.
+   *
+   * @see CloudSolrClient#isUpdatesToLeaders
+   */
+  default boolean isSendToLeaders() {
+    return true;
+  }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
index 0daaa76dcb0..044c24f339d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/SendUpdatesToLeadersOverrideTest.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -165,13 +166,13 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
   }
 
   /**
-   * Given an {@link UpdateRequest} and a {@link SolrClient}, processes that request against that
-   * client while {@link TrackingUpdateProcessorFactory} is recording, does some basic validation,
-   * then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code> coreNames to
-   * the specified validators
+   * Given an {@link AbstractUpdateRequest} and a {@link SolrClient}, processes that request against
+   * that client while {@link TrackingUpdateProcessorFactory} is recording, does some basic
+   * validation, then passes the recorded <code>pre-distrib</code> and <code>post-distrib</code>
+   * coreNames to the specified validators
    */
   private static RecordingResults assertUpdateWithRecording(
-      final UpdateRequest req, final SolrClient client) throws Exception {
+      final AbstractUpdateRequest req, final SolrClient client) throws Exception {
 
     TrackingUpdateProcessorFactory.startRecording("pre-distrib");
     TrackingUpdateProcessorFactory.startRecording("post-distrib");
@@ -194,10 +195,11 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
   }
 
   /**
-   * Since {@link UpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for setting
-   * <code>shards.preference=replica.type:PULL</code> on the input req, and then returning that req
+   * Since {@link AbstractUpdateRequest#setParam} isn't a fluent API, this is a wrapper helper for
+   * setting <code>shards.preference=replica.type:PULL</code> on the input req, and then returning
+   * that req
    */
-  private static UpdateRequest prefPull(final UpdateRequest req) {
+  private static AbstractUpdateRequest prefPull(final AbstractUpdateRequest req) {
     req.setParam("shards.preference", "replica.type:PULL");
     return req;
   }
@@ -205,17 +207,18 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
   // nocommit: - test CloudHttp2SolrClient as well
 
   // basic sanity check of expected default behavior
-  public void testUpdatesDefaultToLeaders() throws Exception {
+  public void testClientThatDefaultsToLeaders() throws Exception {
     try (CloudSolrClient client =
         new CloudLegacySolrClient.Builder(
                 Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
             .sendUpdatesOnlyToShardLeaders()
             .build()) {
       checkUpdatesDefaultToLeaders(client);
+      checkUpdatesWithSendToLeadersFalse(client);
     }
   }
 
-  public void testUpdatesWithShardsPrefPull() throws Exception {
+  public void testClientThatDoesNotDefaultToLeaders() throws Exception {
     try (CloudSolrClient client =
         new CloudLegacySolrClient.Builder(
                 Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
@@ -224,6 +227,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
             .sendUpdatesToAllReplicasInShard()
             .build()) {
       checkUpdatesWithShardsPrefPull(client);
+      checkUpdatesWithSendToLeadersFalse(client);
     }
   }
 
@@ -334,7 +338,7 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
   }
 
   /**
-   * Given a SolrClient, sends various updates using {#link #prefPull} and asserts expecations that
+   * Given a SolrClient, sends various updates using {@link #prefPull} and asserts expecations that
    * these requests will be initially sent to PULL replcias
    */
   private void checkUpdatesWithShardsPrefPull(final CloudSolrClient client) throws Exception {
@@ -441,6 +445,115 @@ public class SendUpdatesToLeadersOverrideTest extends SolrCloudTestCase {
     }
   }
 
+  /**
+   * Given a SolrClient, sends various updates were {@link IsUpdateRequest#isSendToLeaders} returns
+   * false, and asserts expectations that requess using {@link #prefPull} are all sent to PULL
+   * replicas, regardless of how the client is configured.
+   */
+  private void checkUpdatesWithSendToLeadersFalse(final CloudSolrClient client) throws Exception {
+    { // single doc add...
+      final RecordingResults add =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().add(sdoc("id", "hoss"))).setSendToLeaders(false),
+              client);
+
+      // ...should start on (some) PULL replica, since we asked nicely
+      assertThat("add pre-distrib size", add.preDistribCores.keySet(), hasSize(1));
+      assertThat(
+          "add pre-distrib must be PULL",
+          add.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      assertThat("add pre-distrib size", add.preDistribRequests.keySet(), hasSize(1));
+      assertThat("add pre-distrib size", add.preDistribCommands, hasSize(1));
+
+      // ...then be routed to single leader for this id
+      assertThat("add post-distrib size", add.postDistribCores.keySet(), hasSize(1));
+      assertThat(
+          "add post-distrib must be leader",
+          add.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      assertThat("add post-distrib size", add.postDistribRequests.keySet(), hasSize(1));
+      assertThat("add post-distrib size", add.postDistribCommands, hasSize(1));
+
+      // A DBI should also start on (some) PULL replica,  since we asked nicely.
+      //
+      // then it should be distributed to whatever leader our add doc (for the same id) was sent to
+      final RecordingResults del =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().deleteById("hoss")).setSendToLeaders(false), client);
+      assertThat("del pre-distrib size", del.preDistribCores.keySet(), hasSize(1));
+      assertThat(
+          "del pre-distrib must be PULL",
+          del.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      assertThat("del pre-distrib size", del.preDistribRequests.keySet(), hasSize(1));
+      assertThat("del pre-distrib size", del.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "add and del should have same post-distrib leader",
+          add.postDistribCores.keySet(),
+          del.postDistribCores.keySet());
+      assertThat("del post-distrib size", del.postDistribRequests.keySet(), hasSize(1));
+      assertThat("del post-distrib size", del.postDistribCommands, hasSize(1));
+    }
+
+    { // DBQ start on (some) PULL replica, since we asked nicely, then be routed to all leaders
+      final RecordingResults record =
+          assertUpdateWithRecording(
+              prefPull(new UpdateRequest().deleteByQuery("*:*")).setSendToLeaders(false), client);
+
+      assertThat("dbq pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      assertThat(
+          "dbq pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      assertThat("dbq pre-distrib size", record.preDistribRequests.keySet(), hasSize(1));
+      assertThat("dbq pre-distrib size", record.preDistribCommands, hasSize(1));
+
+      assertEquals(
+          "dbq post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      assertThat(
+          "dbq post-distrib size",
+          record.postDistribRequests.keySet(),
+          hasSize(LEADER_CORE_NAMES.size()));
+      assertThat(
+          "dbq post-distrib size", record.postDistribCommands, hasSize(LEADER_CORE_NAMES.size()));
+    }
+
+    { // When we sendToLeaders is disabled, a single UpdateRequest containing multiple adds
+      // should still only go to one replica for all the "pre" commands, then be forwarded
+      // the respective leaders for the "post" commands
+
+      final RecordingResults record =
+          assertUpdateWithRecording(
+              prefPull(createMultiDirectUpdates(100, 10)).setSendToLeaders(false), client);
+
+      assertThat("multi pre-distrib size", record.preDistribCores.keySet(), hasSize(1));
+      assertThat(
+          "multi pre-distrib must be PULL",
+          record.preDistribCores.keySet(),
+          everyItem(isIn(PULL_REPLICA_CORE_NAMES)));
+      assertThat("multi pre-distrib req size", record.preDistribRequests.keySet(), hasSize(1));
+      assertThat("multi pre-distrib command size", record.preDistribCommands, hasSize(100 + 10));
+
+      assertEquals(
+          "multi post-distrib must be all leaders",
+          LEADER_CORE_NAMES,
+          record.postDistribCores.keySet());
+      // NOTE: Don't assume our docIds are spread across multi-shards...
+      //
+      // We make no asertion about number of post-distrb requests
+      // (distrib proc may batch differently then what we send)
+      assertThat(
+          "multi post-distrib cores",
+          record.postDistribCores.keySet(),
+          everyItem(isIn(LEADER_CORE_NAMES)));
+      assertThat("multi post-distrib command size", record.postDistribCommands, hasSize(100 + 10));
+    }
+  }
+
   private static UpdateRequest createMultiDirectUpdates(final int numAdds, final int numDel) {
     final UpdateRequest req = new UpdateRequest();
     for (int i = 0; i < numAdds; i++) {