You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ep...@apache.org on 2021/11/08 22:28:18 UTC

[solr] branch main updated: SOLR-15705: A deleteById request without _route_ param for compositeid router could be sent to all shard leaders (#288)

This is an automated email from the ASF dual-hosted git repository.

epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new f589607  SOLR-15705: A deleteById request without _route_ param for compositeid router could be sent to all shard leaders (#288)
f589607 is described below

commit f589607e700a94544fc546f5d641b5bfd695f32b
Author: Michael Kosten <ma...@me.com>
AuthorDate: Mon Nov 8 14:28:11 2021 -0800

    SOLR-15705: A deleteById request without _route_ param for compositeid router could be sent to all shard leaders (#288)
---
 solr/CHANGES.txt                                   |   3 +
 .../processor/DistributedZkUpdateProcessor.java    | 103 +++++++++++++--------
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   |  35 ++++++-
 .../solr/common/cloud/CompositeIdRouter.java       |  13 +++
 4 files changed, 109 insertions(+), 45 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ac06435..fd82e10 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -164,6 +164,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)
 
 * SOLR-15608: Remove deprecated methods, classes and constructors from solrj clients (janhoy)
 
+* SOLR-15705: A delete-by-id command is forwarded to all shards when using the CompositeId router with a router field
+  and the route is missing from the command. (Michael Kosten via Christine Poerschke, David Smiley, Eric Pugh)
+
 Build
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index e031cf7..c58bca8 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -84,6 +84,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   private Set<String> skippedCoreNodeNames;
   private final String collection;
   private boolean readOnlyCollection = false;
+  private boolean broadcastDeleteById = false;
 
   // The cached immutable clusterState for the update... usually refreshed for each individual update.
   // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
@@ -316,6 +317,14 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
     setupRequest(cmd);
 
+    if (broadcastDeleteById && DistribPhase.NONE == DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))) {
+      DocCollection coll = clusterState.getCollection(collection);
+      if (log.isDebugEnabled()) {
+        log.debug("The deleteById command for doc {} is missing the required route, broadcasting to leaders of other shards", cmd.getId());
+      }
+      forwardDelete(coll, cmd);
+    }
+
     // check if client has requested minimum replication factor information. will set replicationTracker to null if
     // we aren't the leader or subShardLeader
     checkReplicationTracker(cmd);
@@ -383,47 +392,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       if (rollupReplicationTracker == null) {
         rollupReplicationTracker = new RollupRequestReplicationTracker();
       }
-      boolean leaderForAnyShard = false;  // start off by assuming we are not a leader for any shard
-
-      ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
-      outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
-      outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
-          zkController.getBaseUrl(), req.getCore().getName()));
-
-      SolrParams params = req.getParams();
-      String route = params.get(ShardParams._ROUTE_);
-      Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
-
-      List<SolrCmdDistributor.Node> leaders =  new ArrayList<>(slices.size());
-      for (Slice slice : slices) {
-        String sliceName = slice.getName();
-        Replica leader;
-        try {
-          leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
-        } catch (InterruptedException e) {
-          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
-        }
-
-        // TODO: What if leaders changed in the meantime?
-        // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
-
-        // Am I the leader for this slice?
-        ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
-        String leaderCoreNodeName = leader.getName();
-        String coreNodeName = cloudDesc.getCoreNodeName();
-        isLeader = coreNodeName.equals(leaderCoreNodeName);
-
-        if (isLeader) {
-          // don't forward to ourself
-          leaderForAnyShard = true;
-        } else {
-          leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
-        }
-      }
-
-      outParams.remove("commit"); // this will be distributed from the local commit
-
-      cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
+      boolean leaderForAnyShard = forwardDelete(coll, cmd);
 
       if (!leaderForAnyShard) {
         return;
@@ -448,6 +417,54 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     super.doDeleteByQuery(cmd, replicas, coll);
   }
 
+  private boolean forwardDelete(DocCollection coll, DeleteUpdateCommand cmd) throws IOException {
+
+    boolean leaderForAnyShard = false;  // start off by assuming we are not a leader for any shard
+
+    ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
+    outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+    outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+        zkController.getBaseUrl(), req.getCore().getName()));
+
+    SolrParams params = req.getParams();
+    String route = params.get(ShardParams._ROUTE_);
+    Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
+
+    List<SolrCmdDistributor.Node> leaders =  new ArrayList<>(slices.size());
+    for (Slice slice : slices) {
+      String sliceName = slice.getName();
+      Replica leader;
+      try {
+        leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
+      }
+
+      // TODO: What if leaders changed in the meantime?
+      // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
+
+      // Am I the leader for this slice?
+      ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
+      String leaderCoreNodeName = leader.getName();
+      String coreNodeName = cloudDesc.getCoreNodeName();
+      isLeader = coreNodeName.equals(leaderCoreNodeName);
+
+      if (isLeader) {
+        // don't forward to ourself
+        leaderForAnyShard = true;
+      } else {
+        leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
+      }
+    }
+
+    outParams.remove("commit"); // this will be distributed from the local commit
+
+    cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
+
+    return leaderForAnyShard;
+  }
+
+
   @Override
   protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas,
                                         DocCollection coll) throws IOException {
@@ -607,6 +624,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       if (slice == null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
       }
+      // if doc == null, then this is a DeleteById request with missing route, flag for forwarding to all shard leaders
+      if (doc == null && coll.getRouter() instanceof CompositeIdRouter && coll.getActiveSlicesMap().size() > 1) {
+        broadcastDeleteById = true;
+      }
     }
 
     DistribPhase phase =
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 65937d7..32591de 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -249,19 +249,24 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl());
          SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) {
 
-      // Add three documents w/diff routes (all sent to shard1 leader's core)
+      // Add six documents w/diff routes (all sent to shard1 leader's core)
       shard1.add(sdoc("id", "1", "routefield_s", "europe"));
       shard1.add(sdoc("id", "3", "routefield_s", "europe"));
       shard1.add(sdoc("id", "5", "routefield_s", "africa"));
+      shard1.add(sdoc("id", "7", "routefield_s", "europe"));
+      shard1.add(sdoc("id", "9", "routefield_s", "europe"));
+      shard1.add(sdoc("id", "11", "routefield_s", "africa"));
       shard1.commit();
 
-      // Add two documents w/diff routes (all sent to shard2 leader's core)
+      // Add four documents w/diff routes (all sent to shard2 leader's core)
+      shard2.add(sdoc("id", "8", "routefield_s", "africa"));
+      shard2.add(sdoc("id", "6", "routefield_s", "europe"));
       shard2.add(sdoc("id", "4", "routefield_s", "africa"));
       shard2.add(sdoc("id", "2", "routefield_s", "europe"));
       shard2.commit();
       
-      final AtomicInteger docCountsEurope = new AtomicInteger(3);
-      final AtomicInteger docCountsAfrica = new AtomicInteger(2);
+      final AtomicInteger docCountsEurope = new AtomicInteger(6);
+      final AtomicInteger docCountsAfrica = new AtomicInteger(4);
 
       // A re-usable helper to verify that the expected number of documents can be found based on _route_ key...
       Runnable checkShardCounts = () -> {
@@ -298,6 +303,28 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
         docCountsAfrica.decrementAndGet();
       }
       checkShardCounts.run();
+
+      // Tests for distributing delete by id when route is missing from the request
+      { // Send a delete request with no route to shard1 for document on shard2, should be distributed
+        final UpdateRequest deleteRequest = new UpdateRequest();
+        deleteRequest.deleteById("8");
+        shard1.request(deleteRequest);
+        shard1.commit();
+        docCountsAfrica.decrementAndGet();
+      }
+      checkShardCounts.run();
+
+      { // Multiple deleteById commands with missing route in a single request, should be distributed
+        final UpdateRequest deleteRequest = new UpdateRequest();
+        deleteRequest.deleteById("6");
+        deleteRequest.deleteById("11");
+        shard1.request(deleteRequest);
+        shard1.commit();
+        docCountsEurope.decrementAndGet();
+        docCountsAfrica.decrementAndGet();
+      }
+      checkShardCounts.run();
+
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 4b7acc1..d13c06d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -101,6 +101,19 @@ public class CompositeIdRouter extends HashBasedRouter {
     return new KeyParser(id).getHash();
   }
 
+  @Override
+  public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) {
+    // if this is a delete-by-id (sdoc==null), then return null if the route is missing and there is a route field defined.
+    // otherwise, we will return the slice using the hash on the id
+    if (sdoc == null && route == null) {
+      String shardFieldName = getRouteField(collection);
+      if (shardFieldName != null) {
+        return null;
+      }
+    }
+    return super.getTargetSlice(id, sdoc, route, params, collection);
+  }
+
 
   /**
    * Get Range for a given CompositeId based route key