You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ep...@apache.org on 2021/11/08 22:28:18 UTC
[solr] branch main updated: SOLR-15705: A deleteById request
without _route_ param for compositeid router could be sent to all shard
leaders (#288)
This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new f589607 SOLR-15705: A deleteById request without _route_ param for compositeid router could be sent to all shard leaders (#288)
f589607 is described below
commit f589607e700a94544fc546f5d641b5bfd695f32b
Author: Michael Kosten <ma...@me.com>
AuthorDate: Mon Nov 8 14:28:11 2021 -0800
SOLR-15705: A deleteById request without _route_ param for compositeid router could be sent to all shard leaders (#288)
---
solr/CHANGES.txt | 3 +
.../processor/DistributedZkUpdateProcessor.java | 103 +++++++++++++--------
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 35 ++++++-
.../solr/common/cloud/CompositeIdRouter.java | 13 +++
4 files changed, 109 insertions(+), 45 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ac06435..fd82e10 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -164,6 +164,9 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)
* SOLR-15608: Remove deprecated methods, classes and constructors from solrj clients (janhoy)
+* SOLR-15705: A delete-by-id command is forwarded to all shards when using the CompositeId router with a router field
+ and the route is missing from the command. (Michael Kosten via Christine Poerschke, David Smiley, Eric Pugh)
+
Build
---------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index e031cf7..c58bca8 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -84,6 +84,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private Set<String> skippedCoreNodeNames;
private final String collection;
private boolean readOnlyCollection = false;
+ private boolean broadcastDeleteById = false;
// The cached immutable clusterState for the update... usually refreshed for each individual update.
// Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
@@ -316,6 +317,14 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
setupRequest(cmd);
+ if (broadcastDeleteById && DistribPhase.NONE == DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM))) {
+ DocCollection coll = clusterState.getCollection(collection);
+ if (log.isDebugEnabled()) {
+ log.debug("The deleteById command for doc {} is missing the required route, broadcasting to leaders of other shards", cmd.getId());
+ }
+ forwardDelete(coll, cmd);
+ }
+
// check if client has requested minimum replication factor information. will set replicationTracker to null if
// we aren't the leader or subShardLeader
checkReplicationTracker(cmd);
@@ -383,47 +392,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (rollupReplicationTracker == null) {
rollupReplicationTracker = new RollupRequestReplicationTracker();
}
- boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
-
- ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
- outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
- outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- SolrParams params = req.getParams();
- String route = params.get(ShardParams._ROUTE_);
- Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
-
- List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
- for (Slice slice : slices) {
- String sliceName = slice.getName();
- Replica leader;
- try {
- leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
- }
-
- // TODO: What if leaders changed in the meantime?
- // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
-
- // Am I the leader for this slice?
- ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
- String leaderCoreNodeName = leader.getName();
- String coreNodeName = cloudDesc.getCoreNodeName();
- isLeader = coreNodeName.equals(leaderCoreNodeName);
-
- if (isLeader) {
- // don't forward to ourself
- leaderForAnyShard = true;
- } else {
- leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
- }
- }
-
- outParams.remove("commit"); // this will be distributed from the local commit
-
- cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
+ boolean leaderForAnyShard = forwardDelete(coll, cmd);
if (!leaderForAnyShard) {
return;
@@ -448,6 +417,54 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
super.doDeleteByQuery(cmd, replicas, coll);
}
+ private boolean forwardDelete(DocCollection coll, DeleteUpdateCommand cmd) throws IOException {
+
+ boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
+
+ ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
+ outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ SolrParams params = req.getParams();
+ String route = params.get(ShardParams._ROUTE_);
+ Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
+
+ List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
+ for (Slice slice : slices) {
+ String sliceName = slice.getName();
+ Replica leader;
+ try {
+ leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
+ }
+
+ // TODO: What if leaders changed in the meantime?
+ // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
+
+ // Am I the leader for this slice?
+ ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
+ String leaderCoreNodeName = leader.getName();
+ String coreNodeName = cloudDesc.getCoreNodeName();
+ isLeader = coreNodeName.equals(leaderCoreNodeName);
+
+ if (isLeader) {
+ // don't forward to ourself
+ leaderForAnyShard = true;
+ } else {
+ leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
+ }
+ }
+
+ outParams.remove("commit"); // this will be distributed from the local commit
+
+ cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
+
+ return leaderForAnyShard;
+ }
+
+
@Override
protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas,
DocCollection coll) throws IOException {
@@ -607,6 +624,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
}
+ // if doc == null, then this is a DeleteById request with missing route, flag for forwarding to all shard leaders
+ if (doc == null && coll.getRouter() instanceof CompositeIdRouter && coll.getActiveSlicesMap().size() > 1) {
+ broadcastDeleteById = true;
+ }
}
DistribPhase phase =
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 65937d7..32591de 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -249,19 +249,24 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
try (SolrClient shard1 = getHttpSolrClient(docCol.getSlice("shard1").getLeader().getCoreUrl());
SolrClient shard2 = getHttpSolrClient(docCol.getSlice("shard2").getLeader().getCoreUrl())) {
- // Add three documents w/diff routes (all sent to shard1 leader's core)
+ // Add six documents w/diff routes (all sent to shard1 leader's core)
shard1.add(sdoc("id", "1", "routefield_s", "europe"));
shard1.add(sdoc("id", "3", "routefield_s", "europe"));
shard1.add(sdoc("id", "5", "routefield_s", "africa"));
+ shard1.add(sdoc("id", "7", "routefield_s", "europe"));
+ shard1.add(sdoc("id", "9", "routefield_s", "europe"));
+ shard1.add(sdoc("id", "11", "routefield_s", "africa"));
shard1.commit();
- // Add two documents w/diff routes (all sent to shard2 leader's core)
+ // Add four documents w/diff routes (all sent to shard2 leader's core)
+ shard2.add(sdoc("id", "8", "routefield_s", "africa"));
+ shard2.add(sdoc("id", "6", "routefield_s", "europe"));
shard2.add(sdoc("id", "4", "routefield_s", "africa"));
shard2.add(sdoc("id", "2", "routefield_s", "europe"));
shard2.commit();
- final AtomicInteger docCountsEurope = new AtomicInteger(3);
- final AtomicInteger docCountsAfrica = new AtomicInteger(2);
+ final AtomicInteger docCountsEurope = new AtomicInteger(6);
+ final AtomicInteger docCountsAfrica = new AtomicInteger(4);
// A re-usable helper to verify that the expected number of documents can be found based on _route_ key...
Runnable checkShardCounts = () -> {
@@ -298,6 +303,28 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
docCountsAfrica.decrementAndGet();
}
checkShardCounts.run();
+
+ // Tests for distributing delete by id when route is missing from the request
+ { // Send a delete request with no route to shard1 for document on shard2, should be distributed
+ final UpdateRequest deleteRequest = new UpdateRequest();
+ deleteRequest.deleteById("8");
+ shard1.request(deleteRequest);
+ shard1.commit();
+ docCountsAfrica.decrementAndGet();
+ }
+ checkShardCounts.run();
+
+ { // Multiple deleteById commands with missing route in a single request, should be distributed
+ final UpdateRequest deleteRequest = new UpdateRequest();
+ deleteRequest.deleteById("6");
+ deleteRequest.deleteById("11");
+ shard1.request(deleteRequest);
+ shard1.commit();
+ docCountsEurope.decrementAndGet();
+ docCountsAfrica.decrementAndGet();
+ }
+ checkShardCounts.run();
+
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 4b7acc1..d13c06d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -101,6 +101,19 @@ public class CompositeIdRouter extends HashBasedRouter {
return new KeyParser(id).getHash();
}
+ @Override
+ public Slice getTargetSlice(String id, SolrInputDocument sdoc, String route, SolrParams params, DocCollection collection) {
+ // if this is a delete-by-id (sdoc==null), then return null if the route is missing and there is a route field defined.
+ // otherwise, we will return the slice using the hash on the id
+ if (sdoc == null && route == null) {
+ String shardFieldName = getRouteField(collection);
+ if (shardFieldName != null) {
+ return null;
+ }
+ }
+ return super.getTargetSlice(id, sdoc, route, params, collection);
+ }
+
/**
* Get Range for a given CompositeId based route key