You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/06/12 16:51:58 UTC

[solr-sandbox] branch crossdc-wip updated: Add additional unsupported delete-by-query options. (#61)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new e486b5b  Add additional unsupported delete-by-query options. (#61)
e486b5b is described below

commit e486b5bd2cca26f93be64c3a3383f1f0bfc7958d
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Mon Jun 12 11:51:52 2023 -0500

    Add additional unsupported delete-by-query options. (#61)
---
 CROSSDC.md                                         |  11 +-
 .../solr/crossdc/common/KafkaCrossDcConf.java      |   3 +
 .../update/processor/MirroringUpdateProcessor.java | 666 +++++++++++----------
 .../MirroringUpdateRequestProcessorFactory.java    |   4 +-
 .../solr/crossdc/DeleteByQueryIntegrationTest.java | 428 +++++++++++++
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |  12 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |  13 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  12 +-
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |  16 +-
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  13 +-
 10 files changed, 840 insertions(+), 338 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index cf8deaf..fe20e18 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -186,11 +186,16 @@ For situations where you do want to control and enforce a single updateRequestPr
 
 ## Limitations
 
-- Delete-By-Query is not officially supported.
+- Delete-By-Query is not officially supported, however, there are a number of options you can attempt to use if needed.
+  Which option to use can be controlled with the update parameter "deleteMethod". A different default can be configured using CrossDC configuration parameter defaultDBQMethod. Just set it to the desired default value.
+1. default: This is the method used if no deleteMethod parameter is provided, or if it's provided with the value "default". This method will use pagination to fetch documents matching the delete query. It fetches a subset of documents (currently 1000), and deletes them iteratively until all matching documents have been deleted.
 
-    - Work-In-Progress: A non-efficient option to issue multiple delete by id queries using the results of a given standard query.
+2. convert_no_paging: If the deleteMethod parameter is provided with the value "convert_no_paging", the delete by query operation will convert the delete query into individual delete-by-id commands. This method fetches all documents that match the delete query at once (up to 10000), and then deletes each document one by one.
 
-    - Simply forwarding a real Delete-By-Query could also be reasonable if it is not strictly reliant on not being reordered with other requests.
+3. delete_by_query: If the deleteMethod parameter is provided with the value "delete_by_query", the delete by query operation will be performed as a single operation. It will delete all documents that match the delete query at once, both locally and in the mirror cluster (via Kafka).
+   With this method, if the system goes down at a bad time, there are no gaurantees around the replaying of the dbq or the order that it happens in relative to other document updates or additions if replayed upon recovery.
+
+4. delete_by_query_local: If the deleteMethod parameter is provided with the value "delete_by_query_local", the delete by query operation will be performed as a single operation, but only in the local cluster. No deletion will be mirrored in the other clusters.
 
 
 
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 251754d..5295d0a 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -57,6 +57,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
 
+  public static final String DEFAULT_DBQ_METHOD = "defaultDBQMethod";
+
 
   public static final String TOPIC_NAME = "topicName";
 
@@ -131,6 +133,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
             new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
             new ConfigProperty(DELIVERY_TIMEOUT_MS, DEFAULT_DELIVERY_TIMEOUT_MS),
+            new ConfigProperty(DEFAULT_DBQ_METHOD, null),
 
             // Consumer only zkConnectString
             new ConfigProperty(ZK_CONNECT_STRING, null),
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 74ee1eb..6ded26d 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -8,10 +8,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.*;
 import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.*;
 import org.apache.solr.request.SolrQueryRequest;
@@ -35,338 +32,403 @@ import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
 public class MirroringUpdateProcessor extends UpdateRequestProcessor {
 
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  /**
-   * Flag indicating whether this instance creates and submits a mirrored request. This override is
-   * necessary to prevent circular mirroring between coupled cluster running this processor.
-   */
-  private final boolean doMirroring;
-  private final RequestMirroringHandler requestMirroringHandler;
-
-  /**
-   * The mirrored request starts as null, gets created and appended to at each process() call,
-   * then submitted on finish().
-   */
-  private UpdateRequest mirrorRequest;
-  private long mirrorRequestBytes;
-  private final SolrParams mirrorParams;
-
-
-  /**
-   * Controls whether docs exceeding the max-size (and thus cannot be mirrored) are indexed locally.
-   */
-  private final boolean indexUnmirrorableDocs;
-  private final long maxMirroringBatchSizeBytes;
-
-
-  /**
-   * The distributed processor downstream from us so we can establish if we're running on a leader shard
-   */
-  //private DistributedUpdateProcessor distProc;
-
-  /**
-   * Distribution phase of the incoming requests
-   */
-  private DistributedUpdateProcessor.DistribPhase distribPhase;
-
-  public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
-      final boolean indexUnmirrorableDocs,
-      final long maxMirroringBatchSizeBytes,
-      final SolrParams mirroredReqParams,
-      final DistributedUpdateProcessor.DistribPhase distribPhase,
-      final RequestMirroringHandler requestMirroringHandler) {
-    super(next);
-    this.doMirroring = doMirroring;
-    this.indexUnmirrorableDocs = indexUnmirrorableDocs;
-    this.maxMirroringBatchSizeBytes = maxMirroringBatchSizeBytes;
-    this.mirrorParams = mirroredReqParams;
-    this.distribPhase = distribPhase;
-    this.requestMirroringHandler = requestMirroringHandler;
-
-    // Find the downstream distributed update processor
-
-  }
-
-  private UpdateRequest createAndOrGetMirrorRequest() {
-    if (mirrorRequest == null) {
-      mirrorRequest = new UpdateRequest();
-      mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
-      mirrorRequestBytes = 0L;
-    }
-    if (log.isDebugEnabled())
-      log.debug("createOrGetMirrorRequest={}",
-          mirrorRequest);
-    return mirrorRequest;
-  }
-
-  @Override public void processAdd(final AddUpdateCommand cmd) throws IOException {
-
-    final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
-    doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
-    final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
-    final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringBatchSizeBytes;
-    if (tooLargeForKafka && !indexUnmirrorableDocs) {
-      log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable.", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
-    } else {
-      super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
-    }
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    // submit only from the leader shards so we mirror each doc once
-    boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
-    if (doMirroring && isLeader) {
-      if (tooLargeForKafka) {
-        log.error("Skipping mirroring of doc {} because estimated size exceeds batch size limit {} bytes", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
-      } else {
-        createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
-        mirrorRequestBytes += estimatedDocSizeInBytes;
-      }
-    }
+    /**
+     * Flag indicating whether this instance creates and submits a mirrored request. This override is
+     * necessary to prevent circular mirroring between coupled cluster running this processor.
+     */
+    private final boolean doMirroring;
+    private final RequestMirroringHandler requestMirroringHandler;
+    private final String defaultDBQMethod;
 
-    if (log.isDebugEnabled())
-      log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
-  }
+    /**
+     * The mirrored request starts as null, gets created and appended to at each process() call,
+     * then submitted on finish().
+     */
+    private UpdateRequest mirrorRequest;
+    private long mirrorRequestBytes;
+    private final SolrParams mirrorParams;
 
-  @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
-    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
 
-      CloudDescriptor cloudDesc =
-          cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
-      String collection = cloudDesc.getCollectionName();
+    /**
+     * Controls whether docs exceeding the max-size (and thus cannot be mirrored) are indexed locally.
+     */
+    private final boolean indexUnmirrorableDocs;
+    private final long maxMirroringBatchSizeBytes;
+
 
-      HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+    /**
+     * The distributed processor downstream from us so we can establish if we're running on a leader shard
+     */
+    //private DistributedUpdateProcessor distProc;
 
-      try (HttpSolrClient client =
-          new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
+    /**
+     * Distribution phase of the incoming requests
+     */
+    private DistributedUpdateProcessor.DistribPhase distribPhase;
 
-        String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+    public enum DBQ_Method {
+        CONVERT_NO_PAGING("convert_no_paging"),
+        DEFAULT("default"),
+        DELETE_BY_QUERY("delete_by_query"),
+        DELETE_BY_QUERY_LOCAL("delete_by_query_local");
 
-        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
-        SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
-        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+        private final String value;
 
-        int cnt = 1;
-        boolean done = false;
-        while (!done) {
-          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
-          QueryResponse rsp =
-              client.query(collection, q);
-          String nextCursorMark = rsp.getNextCursorMark();
+        DBQ_Method(String value) {
+            this.value = value;
+        }
 
-          if (log.isDebugEnabled()) {
-            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt,
-                rsp.getResults());
-            cnt++;
-          }
+        public String getValue() {
+            return value;
+        }
 
-          processDBQResults(client, collection, uniqueField, rsp);
-          if (cursorMark.equals(nextCursorMark)) {
-            done = true;
-          }
-          cursorMark = nextCursorMark;
+        public static DBQ_Method fromString(String value) {
+            if (value == null) {
+                return DEFAULT;
+            }
+            for (DBQ_Method method : DBQ_Method.values()) {
+                if (method.getValue().equals(value)) {
+                    return method;
+                }
+            }
+            throw new IllegalArgumentException("Invalid dbqMethod: " + value);
         }
-      } catch (SolrServerException e) {
-        throw new SolrException(SERVER_ERROR, e);
-      }
+    }
+
+
+    public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+                                    final boolean indexUnmirrorableDocs,
+                                    final long maxMirroringBatchSizeBytes,
+                                    final SolrParams mirroredReqParams,
+                                    final DistributedUpdateProcessor.DistribPhase distribPhase,
+                                    final RequestMirroringHandler requestMirroringHandler, String defaultDBQMethod) {
+        super(next);
+        this.doMirroring = doMirroring;
+        this.indexUnmirrorableDocs = indexUnmirrorableDocs;
+        this.maxMirroringBatchSizeBytes = maxMirroringBatchSizeBytes;
+        this.mirrorParams = mirroredReqParams;
+        this.distribPhase = distribPhase;
+        this.requestMirroringHandler = requestMirroringHandler;
+        this.defaultDBQMethod = defaultDBQMethod;
+        // Find the downstream distributed update processor
 
-      return;
     }
-    super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
-
-    if (doMirroring) {
-      boolean isLeader = false;
-      if (cmd.isDeleteById()) {
-        // deleteById requests runs once per leader, so we just submit the request from the leader shard
-        isLeader = isLeader(cmd.getReq(),  ((DeleteUpdateCommand)cmd).getId(), null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
-            ShardParams._ROUTE_), null);
-        if (isLeader) {
-          createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+
+    private UpdateRequest createAndOrGetMirrorRequest() {
+        if (mirrorRequest == null) {
+            mirrorRequest = new UpdateRequest();
+            mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+            mirrorRequestBytes = 0L;
         }
         if (log.isDebugEnabled())
-          log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, isLeader, cmd);
-      } else {
-        // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
-        // In general there's no way to guarantee that these run identically on the mirror since there are no
-        // external doc versions.
-        // TODO: Can we actually support this considering DBQs aren't versioned.
-
-        if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
-          createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+            log.debug("createOrGetMirrorRequest={}",
+                    mirrorRequest);
+        return mirrorRequest;
+    }
+
+    @Override
+    public void processAdd(final AddUpdateCommand cmd) throws IOException {
+
+        final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+        doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+        final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
+        final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringBatchSizeBytes;
+        if (tooLargeForKafka && !indexUnmirrorableDocs) {
+            log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable.", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
+        } else {
+            super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
         }
-        if (log.isDebugEnabled())
-          log.debug("processDelete doMirroring={} cmd={}", true, cmd);
-      }
 
+        // submit only from the leader shards so we mirror each doc once
+        boolean isLeader = isLeader(cmd.getReq(), cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
+        if (doMirroring && isLeader) {
+            if (tooLargeForKafka) {
+                log.error("Skipping mirroring of doc {} because estimated size exceeds batch size limit {} bytes", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
+            } else {
+                createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+                mirrorRequestBytes += estimatedDocSizeInBytes;
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
     }
-  }
-
-  private static void processDBQResults(SolrClient client, String collection, String uniqueField,
-      QueryResponse rsp)
-      throws SolrServerException, IOException {
-    SolrDocumentList results = rsp.getResults();
-    List<String> ids = new ArrayList<>(results.size());
-    results.forEach(entries -> {
-      String id = entries.getFirstValue(uniqueField).toString();
-      ids.add(id);
-    });
-    if (ids.size() > 0) {
-      client.deleteById(collection, ids);
+
+    @Override
+    public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+        String dbqMethod = cmd.getReq().getParams().get("dbqMethod");
+        if (dbqMethod == null) {
+            dbqMethod = defaultDBQMethod;
+        }
+        DBQ_Method dbqMethodEnum = DBQ_Method.fromString(dbqMethod);
+
+
+        if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+            CloudDescriptor cloudDesc = cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+            String collection = cloudDesc.getCollectionName();
+            HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+
+            try (HttpSolrClient client = new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
+                String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+                switch (dbqMethodEnum) {
+                    case DEFAULT:
+                        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+                        SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+                        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+                        int cnt = 1;
+                        boolean done = false;
+                        while (!done) {
+                            q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+                            QueryResponse rsp =
+                                    client.query(collection, q);
+                            String nextCursorMark = rsp.getNextCursorMark();
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt,
+                                        rsp.getResults());
+                                cnt++;
+                            }
+
+                            processDBQResults(client, collection, uniqueField, rsp);
+                            if (cursorMark.equals(nextCursorMark)) {
+                                done = true;
+                            }
+                            cursorMark = nextCursorMark;
+                        }
+                        break;
+                    case CONVERT_NO_PAGING:
+                        rows = 10000;
+                        q = new SolrQuery(cmd.query).setRows(rows).setFields(uniqueField);
+                        QueryResponse rsp = client.query(collection, q);
+
+                        // Convert query to delete-by-id commands
+                        for (SolrDocument result : rsp.getResults()) {
+                            DeleteUpdateCommand deleteCmd = new DeleteUpdateCommand(cmd.getReq());
+                            deleteCmd.setId((String) result.getFieldValue(uniqueField));
+                            super.processDelete(deleteCmd); // Execute locally
+                            createAndOrGetMirrorRequest().deleteById((String) result.getFieldValue(uniqueField)); // Send to kafka
+                        }
+                        break;
+                    case DELETE_BY_QUERY:
+                        super.processDelete(cmd); // Execute delete by query locally
+                        createAndOrGetMirrorRequest().deleteByQuery(cmd.query); // Send to kafka
+                        break;
+                    case DELETE_BY_QUERY_LOCAL:
+                        super.processDelete(cmd); // Execute delete by query locally only
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Invalid dbqMethod: " + dbqMethod);
+                }
+
+
+                return;
+            } catch (SolrServerException e) {
+                throw new SolrException(SERVER_ERROR, e);
+            }
+        }
+
+        super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+        if (doMirroring) {
+            boolean isLeader = false;
+            if (cmd.isDeleteById()) {
+                // deleteById requests runs once per leader, so we just submit the request from the leader shard
+                isLeader = isLeader(cmd.getReq(), ((DeleteUpdateCommand) cmd).getId(), null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+                        ShardParams._ROUTE_), null);
+                if (isLeader) {
+                    createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+                }
+                if (log.isDebugEnabled())
+                    log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, isLeader, cmd);
+            } else {
+                // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+                // In general there's no way to guarantee that these run identically on the mirror since there are no
+                // external doc versions.
+                // TODO: Can we actually support this considering DBQs aren't versioned.
+
+                if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
+                    createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+                }
+                if (log.isDebugEnabled())
+                    log.debug("processDelete doMirroring={} cmd={}", true, cmd);
+            }
+
+        }
     }
-  }
-
-  private boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) {
-    CloudDescriptor cloudDesc =
-        req.getCore().getCoreDescriptor().getCloudDescriptor();
-    String collection = cloudDesc.getCollectionName();
-    ClusterState clusterState =
-        req.getCore().getCoreContainer().getZkController().getClusterState();
-    DocCollection coll = clusterState.getCollection(collection);
-    Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
-
-    if (slice == null) {
-      // No slice found.  Most strict routers will have already thrown an exception, so a null return is
-      // a signal to use the slice of this core.
-      // TODO: what if this core is not in the targeted collection?
-      String shardId = cloudDesc.getShardId();
-      slice = coll.getSlice(shardId);
-      if (slice == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
-      }
+
+    private static void processDBQResults(SolrClient client, String collection, String uniqueField,
+                                          QueryResponse rsp)
+            throws SolrServerException, IOException {
+        SolrDocumentList results = rsp.getResults();
+        List<String> ids = new ArrayList<>(results.size());
+        results.forEach(entries -> {
+            String id = entries.getFirstValue(uniqueField).toString();
+            ids.add(id);
+        });
+        if (ids.size() > 0) {
+            client.deleteById(collection, ids);
+        }
     }
-    String shardId = slice.getName();
-    Replica leaderReplica = null;
-    try {
-      leaderReplica = req.getCore().getCoreContainer().getZkController().getZkStateReader().getLeaderRetry(collection, shardId);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+
+    private boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) {
+        CloudDescriptor cloudDesc =
+                req.getCore().getCoreDescriptor().getCloudDescriptor();
+        String collection = cloudDesc.getCollectionName();
+        ClusterState clusterState =
+                req.getCore().getCoreContainer().getZkController().getClusterState();
+        DocCollection coll = clusterState.getCollection(collection);
+        Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
+
+        if (slice == null) {
+            // No slice found.  Most strict routers will have already thrown an exception, so a null return is
+            // a signal to use the slice of this core.
+            // TODO: what if this core is not in the targeted collection?
+            String shardId = cloudDesc.getShardId();
+            slice = coll.getSlice(shardId);
+            if (slice == null) {
+                throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
+            }
+        }
+        String shardId = slice.getName();
+        Replica leaderReplica = null;
+        try {
+            leaderReplica = req.getCore().getCoreContainer().getZkController().getZkStateReader().getLeaderRetry(collection, shardId);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+        }
+        return leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
     }
-    return leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
-  }
-
-  @Override public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
-    super.processRollback(cmd);
-    // TODO: We can't/shouldn't support this ?
-  }
-
-  public void processCommit(CommitUpdateCommand cmd) throws IOException {
-    log.debug("process commit cmd={}", cmd);
-    if (next != null) next.processCommit(cmd);
-  }
-
-  @Override public final void finish() throws IOException {
-    super.finish();
-
-    if (doMirroring && mirrorRequest != null) {
-      // We are configured to mirror, but short-circuit on batches we already know will fail (because they cumulatively
-      // exceed the mirroring max-size)
-      if (mirrorRequestBytes > maxMirroringBatchSizeBytes) {
-        final String batchedIds = mirrorRequest.getDocuments().stream()
-                .map(doc -> doc.getField("id").getValue().toString())
-                .collect(Collectors.joining(", "));
-        log.warn("Mirroring skipped for request because batch size {} bytes exceeds limit {} bytes.  IDs: {}",
-                mirrorRequestBytes, maxMirroringBatchSizeBytes, batchedIds);
-        mirrorRequest = null;
-        mirrorRequestBytes = 0L;
-        return;
-      }
-
-      try {
-        requestMirroringHandler.mirror(mirrorRequest);
-        mirrorRequest = null; // so we don't accidentally submit it again
-      } catch (Exception e) {
-        log.error("mirror submit failed", e);
-        throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
-      }
+
+    @Override
+    public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+        super.processRollback(cmd);
+        // TODO: We can't/shouldn't support this ?
     }
-  }
 
-  // package private for testing
-  static class ObjectSizeEstimator {
-    /**
-     * Sizes of primitive classes.
-     */
-    private static final Map<Class<?>,Integer> primitiveSizes = new IdentityHashMap<>();
-    static {
-      primitiveSizes.put(boolean.class, 1);
-      primitiveSizes.put(Boolean.class, 1);
-      primitiveSizes.put(byte.class, 1);
-      primitiveSizes.put(Byte.class, 1);
-      primitiveSizes.put(char.class, Character.BYTES);
-      primitiveSizes.put(Character.class, Character.BYTES);
-      primitiveSizes.put(short.class, Short.BYTES);
-      primitiveSizes.put(Short.class, Short.BYTES);
-      primitiveSizes.put(int.class, Integer.BYTES);
-      primitiveSizes.put(Integer.class, Integer.BYTES);
-      primitiveSizes.put(float.class, Float.BYTES);
-      primitiveSizes.put(Float.class, Float.BYTES);
-      primitiveSizes.put(double.class, Double.BYTES);
-      primitiveSizes.put(Double.class, Double.BYTES);
-      primitiveSizes.put(long.class, Long.BYTES);
-      primitiveSizes.put(Long.class, Long.BYTES);
+    public void processCommit(CommitUpdateCommand cmd) throws IOException {
+        log.debug("process commit cmd={}", cmd);
+        if (next != null) next.processCommit(cmd);
     }
 
-    public static long estimate(SolrInputDocument doc) {
-      if (doc == null) return 0L;
-      long size = 0;
-      for (SolrInputField inputField : doc.values()) {
-        size += primitiveEstimate(inputField.getName(), 0L);
-        size += estimate(inputField.getValue());
-      }
-
-      if (doc.hasChildDocuments()) {
-        for (SolrInputDocument childDoc : doc.getChildDocuments()) {
-          size += estimate(childDoc);
+    @Override
+    public final void finish() throws IOException {
+        super.finish();
+
+        if (doMirroring && mirrorRequest != null) {
+            // We are configured to mirror, but short-circuit on batches we already know will fail (because they cumulatively
+            // exceed the mirroring max-size)
+            if (mirrorRequestBytes > maxMirroringBatchSizeBytes) {
+                final String batchedIds = mirrorRequest.getDocuments().stream()
+                        .map(doc -> doc.getField("id").getValue().toString())
+                        .collect(Collectors.joining(", "));
+                log.warn("Mirroring skipped for request because batch size {} bytes exceeds limit {} bytes.  IDs: {}",
+                        mirrorRequestBytes, maxMirroringBatchSizeBytes, batchedIds);
+                mirrorRequest = null;
+                mirrorRequestBytes = 0L;
+                return;
+            }
+
+            try {
+                requestMirroringHandler.mirror(mirrorRequest);
+                mirrorRequest = null; // so we don't accidentally submit it again
+            } catch (Exception e) {
+                log.error("mirror submit failed", e);
+                throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+            }
         }
-      }
-      return size;
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    static long estimate(Object obj) {
-      if (obj instanceof SolrInputDocument) {
-        return estimate((SolrInputDocument) obj);
-      }
+    // package private for testing
+    static class ObjectSizeEstimator {
+        /**
+         * Sizes of primitive classes.
+         */
+        private static final Map<Class<?>, Integer> primitiveSizes = new IdentityHashMap<>();
+
+        static {
+            primitiveSizes.put(boolean.class, 1);
+            primitiveSizes.put(Boolean.class, 1);
+            primitiveSizes.put(byte.class, 1);
+            primitiveSizes.put(Byte.class, 1);
+            primitiveSizes.put(char.class, Character.BYTES);
+            primitiveSizes.put(Character.class, Character.BYTES);
+            primitiveSizes.put(short.class, Short.BYTES);
+            primitiveSizes.put(Short.class, Short.BYTES);
+            primitiveSizes.put(int.class, Integer.BYTES);
+            primitiveSizes.put(Integer.class, Integer.BYTES);
+            primitiveSizes.put(float.class, Float.BYTES);
+            primitiveSizes.put(Float.class, Float.BYTES);
+            primitiveSizes.put(double.class, Double.BYTES);
+            primitiveSizes.put(Double.class, Double.BYTES);
+            primitiveSizes.put(long.class, Long.BYTES);
+            primitiveSizes.put(Long.class, Long.BYTES);
+        }
 
-      if (obj instanceof Map) {
-        return estimate((Map) obj);
-      }
+        public static long estimate(SolrInputDocument doc) {
+            if (doc == null) return 0L;
+            long size = 0;
+            for (SolrInputField inputField : doc.values()) {
+                size += primitiveEstimate(inputField.getName(), 0L);
+                size += estimate(inputField.getValue());
+            }
+
+            if (doc.hasChildDocuments()) {
+                for (SolrInputDocument childDoc : doc.getChildDocuments()) {
+                    size += estimate(childDoc);
+                }
+            }
+            return size;
+        }
 
-      if (obj instanceof Collection) {
-        return estimate((Collection) obj);
-      }
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        static long estimate(Object obj) {
+            if (obj instanceof SolrInputDocument) {
+                return estimate((SolrInputDocument) obj);
+            }
 
-      return primitiveEstimate(obj, 0L);
-    }
+            if (obj instanceof Map) {
+                return estimate((Map) obj);
+            }
 
-    private static long primitiveEstimate(Object obj, long def) {
-      Class<?> clazz = obj.getClass();
-      if (clazz.isPrimitive()) {
-        return primitiveSizes.get(clazz);
-      }
-      if (obj instanceof String) {
-        return ((String) obj).length() * Character.BYTES;
-      }
-      return def;
-    }
+            if (obj instanceof Collection) {
+                return estimate((Collection) obj);
+            }
 
-    private static long estimate(Map<Object, Object> map) {
-      if (map.isEmpty()) return 0;
-      long size = 0;
-      for (Map.Entry<Object, Object> entry : map.entrySet()) {
-        size += primitiveEstimate(entry.getKey(), 0L);
-        size += estimate(entry.getValue());
-      }
-      return size;
-    }
+            return primitiveEstimate(obj, 0L);
+        }
+
+        private static long primitiveEstimate(Object obj, long def) {
+            Class<?> clazz = obj.getClass();
+            if (clazz.isPrimitive()) {
+                return primitiveSizes.get(clazz);
+            }
+            if (obj instanceof String) {
+                return ((String) obj).length() * Character.BYTES;
+            }
+            return def;
+        }
 
-    private static long estimate(@SuppressWarnings({"rawtypes"})Collection collection) {
-      if (collection.isEmpty()) return 0;
-      long size = 0;
-      for (Object obj : collection) {
-        size += estimate(obj);
-      }
-      return size;
+        private static long estimate(Map<Object, Object> map) {
+            if (map.isEmpty()) return 0;
+            long size = 0;
+            for (Map.Entry<Object, Object> entry : map.entrySet()) {
+                size += primitiveEstimate(entry.getKey(), 0L);
+                size += estimate(entry.getValue());
+            }
+            return size;
+        }
+
+        private static long estimate(@SuppressWarnings({"rawtypes"}) Collection collection) {
+            if (collection.isEmpty()) return 0;
+            long size = 0;
+            for (Object obj : collection) {
+                size += estimate(obj);
+            }
+            return size;
+        }
     }
-  }
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 669bfa2..3f89bfe 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -249,8 +249,10 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
         }
 
+        String defaultDBQMethod = conf.get(KafkaCrossDcConf.DEFAULT_DBQ_METHOD);
+
         return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, maxMirroringBatchSizeBytes, mirroredParams,
-                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, defaultDBQMethod);
     }
 
     private static class NoOpUpdateRequestProcessor extends UpdateRequestProcessor {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java
new file mode 100644
index 0000000..b1e1d07
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java
@@ -0,0 +1,428 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {SolrIgnoredThreadsFilter.class,
+        QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
+@ThreadLeakLingering(linger = 5000)
+public class DeleteByQueryIntegrationTest extends
+        SolrTestCaseJ4 {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static final int MAX_MIRROR_BATCH_SIZE_BYTES = Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
+    private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+
+    static final String VERSION_FIELD = "_version_";
+
+    private static final int NUM_BROKERS = 1;
+    public static EmbeddedKafkaCluster kafkaCluster;
+
+    protected static volatile MiniSolrCloudCluster solrCluster1;
+    protected static volatile MiniSolrCloudCluster solrCluster2;
+
+    protected static volatile Consumer consumer = new Consumer();
+
+    private static String TOPIC = "topic1";
+
+    private static String COLLECTION = "collection1";
+    private static String ALT_COLLECTION = "collection2";
+
+    @BeforeClass
+    public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+        Properties config = new Properties();
+        config.put("unclean.leader.election.enable", "true");
+        config.put("enable.partition.eof", "false");
+
+        kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+            public String bootstrapServers() {
+                return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+            }
+        };
+        kafkaCluster.start();
+
+        kafkaCluster.createTopic(TOPIC, 1, 1);
+
+        System.setProperty("topicName", TOPIC);
+        System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+        solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+                getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+        CollectionAdminRequest.Create create =
+                CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+        solrCluster1.getSolrClient().request(create);
+        solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+        solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+        solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+                getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+        CollectionAdminRequest.Create create2 =
+                CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+        solrCluster2.getSolrClient().request(create2);
+        solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+        solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+        String bootstrapServers = kafkaCluster.bootstrapServers();
+        log.info("bootstrapServers={}", bootstrapServers);
+
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+        properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+        properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+        properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+        properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, MAX_DOC_SIZE_BYTES);
+        consumer.start(properties);
+
+    }
+
+    @AfterClass
+    public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+        ObjectReleaseTracker.clear();
+
+        consumer.shutdown();
+
+        if (solrCluster1 != null) {
+            solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+            solrCluster1.shutdown();
+        }
+        if (solrCluster2 != null) {
+            solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+            solrCluster2.shutdown();
+        }
+
+        try {
+            kafkaCluster.stop();
+        } catch (Exception e) {
+            log.error("Exception stopping Kafka cluster", e);
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        solrCluster1.deleteAllCollections();
+        solrCluster2.deleteAllCollections();
+        CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+        solrCluster1.getSolrClient().request(create);
+        solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+        solrCluster2.getSolrClient().request(create);
+        solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+    }
+
+    @Test
+    public void testDeleteById() throws Exception {
+        // Add document
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        SolrInputDocument doc1 = new SolrInputDocument();
+        String id1 = String.valueOf(System.currentTimeMillis());
+        doc1.addField("id", id1);
+        doc1.addField("text", "test text 1");
+        client1.add(doc1);
+        client1.commit(COLLECTION);
+
+        // Delete document by ID
+        client1.deleteById(COLLECTION, id1);
+
+        // Verify deletion in solrCluster2
+        assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id1);
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        // Add document
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        SolrInputDocument doc1 = new SolrInputDocument();
+        String id1 = String.valueOf(System.currentTimeMillis());
+        doc1.addField("id", id1);
+        doc1.addField("text", "test text 2");
+        client1.add(doc1);
+        client1.commit(COLLECTION);
+
+        // Delete document by query
+        client1.deleteByQuery(COLLECTION, "id:" + id1);
+
+        // Verify deletion in solrCluster2
+        assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id1);
+    }
+
+    @Test
+    public void testDeleteByQueryWithNoPaging() throws Exception {
+        // Add document
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        SolrInputDocument doc1 = new SolrInputDocument();
+        String id1 = String.valueOf(System.currentTimeMillis());
+        doc1.addField("id", id1);
+        doc1.addField("text", "test text 3");
+        client1.add(doc1);
+        client1.commit(COLLECTION);
+
+        // Set dbqMethod to convert_no_paging
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set("dbqMethod", "convert_no_paging");
+
+        // Delete document by query
+        UpdateRequest updateRequest = new UpdateRequest();
+        updateRequest.setParams(params);
+        updateRequest.deleteByQuery("id:" + id1);
+        updateRequest.process(client1, COLLECTION);
+
+        // Verify deletion in solrCluster2
+        assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id1);
+    }
+
+    @Test
+    public void testBulkDelete() throws Exception {
+        // Add several documents
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        List<String> ids = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            String id = String.valueOf(System.currentTimeMillis() + i);
+            ids.add(id);
+            doc.addField("id", id);
+            doc.addField("text", "test text " + i);
+            client1.add(doc);
+        }
+        client1.commit(COLLECTION);
+
+        // Delete documents by ID
+        client1.deleteById(COLLECTION, ids);
+
+        // Verify deletions in solrCluster2
+        for (String id : ids) {
+            assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id);
+        }
+    }
+
+    @Test
+    public void testConcurrentDeletes() throws Exception {
+        // Create several threads, each performing add and delete operations
+        int threadCount = 10;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        for (int i = 0; i < threadCount; i++) {
+            final int threadId = i;
+            executor.submit(() -> {
+                try {
+                    // Perform add and delete operations
+                    CloudSolrClient client = solrCluster1.getSolrClient();
+                    String id = String.valueOf(System.currentTimeMillis()) + "-" + threadId;
+                    SolrInputDocument doc = new SolrInputDocument();
+                    doc.addField("id", id);
+                    doc.addField("text", "test text " + threadId);
+                    client.add(doc);
+                    client.commit(COLLECTION);
+
+                    // Delete the document
+                    client.deleteById(COLLECTION, id);
+
+                    // Check if the document is deleted in the second cluster
+                    assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+
+        // Shut down the executor and wait for all tasks to complete
+        executor.shutdown();
+        assertTrue("Tasks did not finish in time", executor.awaitTermination(5, TimeUnit.MINUTES));
+    }
+
+    @Test
+    public void testDeleteNonexistentDocument() throws Exception {
+        // Try to delete a document that doesn't exist
+        CloudSolrClient client = solrCluster1.getSolrClient();
+        String id = "nonexistent";
+        client.deleteById(COLLECTION, id);
+
+        // Check that the nonexistent document is still nonexistent in the second cluster
+        assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id);
+    }
+
+    @Test
+    public void testStressDelete() throws Exception {
+        // Create a large number of documents
+        int docCount = 250;
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        List<String> ids = new ArrayList<>(docCount);
+        for (int i = 0; i < docCount; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            String id = String.valueOf(System.currentTimeMillis() + i);
+            ids.add(id);
+            doc.addField("id", id);
+            doc.addField("text", "test text " + i);
+            client1.add(doc);
+        }
+        client1.commit(COLLECTION);
+
+        // Delete the documents
+        for (String id : ids) {
+            client1.deleteById(COLLECTION, id);
+        }
+
+        // Verify deletions in solrCluster2
+        for (String id : ids) {
+            assertCluster2EventuallyHasNoDocs(COLLECTION, "id:" + id);
+        }
+    }
+
+    @Test
+    public void testDeleteByQueryLocal() throws Exception {
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        CloudSolrClient client2 = solrCluster2.getSolrClient();
+
+        // Add a document
+        String id = String.valueOf(System.nanoTime());
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", id);
+        doc.addField("text", "test text");
+        client1.add(doc);
+        client1.commit(COLLECTION);
+
+        assertClusterEventuallyHasDocs(client2, COLLECTION, "id:" + id, 1);
+
+        // Delete the document locally using deleteByQuery with dbqMethod set to "delete_by_query_local"
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set("dbqMethod", "delete_by_query_local");
+        UpdateRequest request = new UpdateRequest();
+        request.setParams(params);
+        request.deleteByQuery("id:" + id);
+        request.process(client1, COLLECTION);
+        client1.commit(COLLECTION);
+
+        // The document should be deleted locally
+        assertClusterEventuallyHasDocs(client1, COLLECTION, "id:" + id, 0);
+
+        Thread.sleep(1500);
+        client2.commit(COLLECTION);
+        // The document should not be deleted in the second cluster
+        assertClusterEventuallyHasDocs(client2, COLLECTION, "id:" + id, 1);
+    }
+
+    @Test
+    public void testPassDeleteByQuery() throws Exception {
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        CloudSolrClient client2 = solrCluster2.getSolrClient();
+
+        // Add a document
+        String id = String.valueOf(System.currentTimeMillis());
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", id);
+        doc.addField("text", "test text");
+        client1.add(doc);
+        client1.commit(COLLECTION);
+
+        // Delete the document using deleteByQuery with dbqMethod set to "delete_by_query"
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set("dbqMethod", "delete_by_query");
+        UpdateRequest request = new UpdateRequest();
+        request.setParams(params);
+        request.deleteByQuery("id:" + id);
+        request.process(client1, COLLECTION);
+        client1.commit(COLLECTION);
+
+        // The document should be deleted in both clusters
+        assertClusterEventuallyHasDocs(client1, COLLECTION, "id:" + id, 0);
+        assertClusterEventuallyHasDocs(client2, COLLECTION, "id:" + id, 0);
+    }
+
+    @Test
+    public void testDeleteByQueryConvertNoPaging() throws Exception {
+        CloudSolrClient client1 = solrCluster1.getSolrClient();
+        CloudSolrClient client2 = solrCluster2.getSolrClient();
+
+        // Add a document
+        String id = String.valueOf(System.currentTimeMillis());
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", id);
+        doc.addField("text", "test text");
+        client1.add(doc);
+        client1.commit(COLLECTION);
+
+        // Delete the document using deleteByQuery with dbqMethod set to "convert_no_paging"
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set("dbqMethod", "convert_no_paging");
+        UpdateRequest request = new UpdateRequest();
+        request.setParams(params);
+        request.deleteByQuery("id:" + id);
+        request.process(client1, COLLECTION);
+        client1.commit(COLLECTION);
+
+        // The document should be deleted in both clusters
+        assertClusterEventuallyHasDocs(client1, COLLECTION, "id:" + id, 0);
+        assertClusterEventuallyHasDocs(client2, COLLECTION, "id:" + id, 0);
+    }
+
+    private void assertCluster2EventuallyHasNoDocs(String collection, String query) throws Exception {
+        assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, 0);
+    }
+
+    private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
+        assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, expectedNumDocs);
+    }
+
+    private void createCollection(CloudSolrClient client, CollectionAdminRequest.Create createCmd) throws Exception {
+        final String stashedDefault = client.getDefaultCollection();
+        try {
+            //client.setDefaultCollection(null);
+            client.request(createCmd);
+        } finally {
+            //client.setDefaultCollection(stashedDefault);
+        }
+    }
+
+    private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception {
+        QueryResponse results = null;
+        boolean foundUpdates = false;
+        for (int i = 0; i < 500; i++) {
+            client.commit(collection);
+            results = client.query(collection, new SolrQuery(query));
+            if (results.getResults().getNumFound() == expectedNumDocs) {
+                foundUpdates = true;
+            } else {
+                Thread.sleep(100);
+            }
+        }
+
+        assertTrue("results=" + results, foundUpdates);
+    }
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index cf25ebe..5ec91d3 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -122,12 +122,6 @@ import java.util.Properties;
 
     consumer.shutdown();
 
-    try {
-      kafkaCluster.stop();
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -136,6 +130,12 @@ import java.util.Properties;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 8cbdae2..10e478c 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -142,12 +142,6 @@ import java.util.Properties;
 
     consumer.shutdown();
 
-    try {
-      kafkaCluster.stop();
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.shutdown();
     }
@@ -161,6 +155,13 @@ import java.util.Properties;
     if (zkTestServer2 != null) {
       zkTestServer2.shutdown();
     }
+
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 7cba50a..951fc81 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -129,12 +129,6 @@ import static org.mockito.Mockito.spy;
 
     consumer.shutdown();
 
-    try {
-      kafkaCluster.stop();
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -143,6 +137,12 @@ import static org.mockito.Mockito.spy;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index e25ac83..a6e7a76 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -107,14 +107,6 @@ import java.util.*;
 
     consumer.shutdown();
 
-    try {
-      if (kafkaCluster != null) {
-        kafkaCluster.stop();
-      }
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -123,6 +115,14 @@ import java.util.*;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+
+    try {
+      if (kafkaCluster != null) {
+        kafkaCluster.stop();
+      }
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index f6c8844..2646827 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -138,12 +138,6 @@ import java.util.Properties;
     consumer1.shutdown();
     consumer2.shutdown();
 
-    try {
-      kafkaCluster.stop();
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -152,6 +146,13 @@ import java.util.Properties;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
   }
 
   @After