You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/28 12:34:55 UTC

(solr-sandbox) branch main updated: Add support for optional DBQ expansion. (#91)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new ef11a85  Add support for optional DBQ expansion. (#91)
ef11a85 is described below

commit ef11a8581fa7b877b964f79693632ac24e9bf399
Author: Andrzej BiaƂecki <ab...@apache.org>
AuthorDate: Tue Nov 28 13:34:49 2023 +0100

    Add support for optional DBQ expansion. (#91)
---
 CROSSDC.md                                         |  28 ++---
 .../apache/solr/crossdc/common/CrossDcConf.java    |  25 ++++
 .../solr/crossdc/common/KafkaCrossDcConf.java      |   3 +
 .../org/apache/solr/crossdc/consumer/Consumer.java |   2 +-
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |   4 +-
 .../update/processor/MirroringUpdateProcessor.java |  18 ++-
 .../MirroringUpdateRequestProcessorFactory.java    |   3 +-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java | 127 +++++++++++++++------
 .../processor/MirroringUpdateProcessorTest.java    |  34 +++++-
 .../configs/cloud-minimal-no-dbq/conf/schema.xml   |  54 +++++++++
 .../conf/solrconfig-producerdisabled.xml           | 119 +++++++++++++++++++
 .../cloud-minimal-no-dbq/conf/solrconfig.xml       | 123 ++++++++++++++++++++
 12 files changed, 487 insertions(+), 53 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index 53a0930..f4a2af9 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -78,20 +78,22 @@ Add the following line to `solr.xml`:
 
 The required configuration properties are:
 - `bootstrapServers`: list of servers used to connect to the Kafka cluster
-- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on. This topic must already exist.
 
 Optional configuration properties:
-- `batchSizeBytes`: maximum batch size in bytes for the Kafka queue
-- `bufferMemoryBytes`: memory allocated by the Producer in total for buffering 
-- `lingerMs`: amount of time that the Producer will wait to add to a batch
-- `requestTimeout`: request timeout for the Producer 
-- `indexUnmirrorableDocs`: if set to True, updates that are too large for the Kafka queue will still be indexed on the primary.
-- `enableDataCompression`: whether to use compression for data sent over the Kafka queue - can be none (default), gzip, snappy, lz4, or zstd
-- `numRetries`: Setting a value greater than zero will cause the Producer to resend any record whose send fails with a potentially transient error.
-- `retryBackoffMs`: The amount of time to wait before attempting to retry a failed request to a given topic partition.
-- `deliveryTimeoutMS`: Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
-- `maxRequestSizeBytes`: The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
-- `mirrorCommits`: if "true" then standalone commit requests will be mirrored, otherwise they will be processed only locally.
+- `batchSizeBytes`: (integer) maximum batch size in bytes for the Kafka queue
+- `bufferMemoryBytes`: (integer) memory allocated by the Producer in total for buffering 
+- `lingerMs`: (integer) amount of time that the Producer will wait to add to a batch
+- `requestTimeout`: (integer) request timeout for the Producer 
+- `indexUnmirrorableDocs`: (boolean) if set to True, updates that are too large for the Kafka queue will still be indexed on the primary.
+- `enableDataCompression`: (boolean) whether to use compression for data sent over the Kafka queue - can be none (default), gzip, snappy, lz4, or zstd
+- `numRetries`: (integer) Setting a value greater than zero will cause the Producer to resend any record whose send fails with a potentially transient error.
+- `retryBackoffMs`: (integer) The amount of time to wait before attempting to retry a failed request to a given topic partition.
+- `deliveryTimeoutMS`: (integer) Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
+- `maxRequestSizeBytes`: (integer) The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
+- `dqlTopicName`: (string) if not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
+- `mirrorCommits`: (boolean) if "true" then standalone commit requests will be mirrored, otherwise they will be processed only locally.
+- `expandDbq`: (enum) if set to "expand" (default) then Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, which may help with correct processing of out-of-order requests on the consumer side. If set to "none" then Delete-By-Query requests will be mirrored as-is.
 
 #### CrossDC Consumer Application
 
@@ -130,5 +132,5 @@ To make the Cross DC UpdateProcessor optional in a common `solrconfig.xml`, use
 
 ## Limitations
 
-- Delete-By-Query converts to DeleteById, which can be much less efficient for queries matching large numbers of documents.
+- When `expandDbq` property is true (default) then Delete-By-Query converts to DeleteById, which can be much less efficient for queries matching large numbers of documents.
   Forwarding a real Delete-By-Query could also be a reasonable option to add if it is not strictly reliant on not being reordered with other requests.
\ No newline at end of file
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index b34ae5b..4abe7a8 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -16,7 +16,32 @@
  */
 package org.apache.solr.crossdc.common;
 
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
 public abstract class CrossDcConf {
     public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
     public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
+    public static final String EXPAND_DBQ = "expandDbq";
+
+    public enum ExpandDbq {
+        NONE,
+        EXPAND;
+
+        private static final Map<String, ExpandDbq> valueMap = new HashMap<>();
+        static {
+            for (ExpandDbq value : values()) {
+                valueMap.put(value.name().toUpperCase(Locale.ROOT), value);
+            }
+        }
+
+        public static ExpandDbq getOrDefault(String strValue, ExpandDbq defaultValue) {
+            if (strValue == null || strValue.isBlank()) {
+                return defaultValue;
+            }
+            ExpandDbq value = valueMap.get(strValue.toUpperCase(Locale.ROOT));
+            return value != null ? value : defaultValue;
+        }
+    }
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index eaafb0f..f94166e 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -63,6 +63,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   private static final String DEFAULT_MIRROR_COMMITS = "false";
 
+  private static final String DEFAULT_EXPAND_DBQ = ExpandDbq.EXPAND.name();
+
   public static final String TOPIC_NAME = "topicName";
 
   public static final String DLQ_TOPIC_NAME = "dlqTopicName";
@@ -161,6 +163,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
             new ConfigProperty(MIRROR_COLLECTIONS, DEFAULT_MIRROR_COLLECTIONS),
             new ConfigProperty(MIRROR_COMMITS, DEFAULT_MIRROR_COMMITS),
+            new ConfigProperty(EXPAND_DBQ, DEFAULT_EXPAND_DBQ),
 
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 7314226..c91ad79 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -104,7 +104,7 @@ public class Consumer {
         }
     }
 
-    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
+    protected CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
         return new KafkaCrossDcConsumer(conf, startLatch);
     }
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index b711b56..71a7f46 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -48,7 +48,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   private final int maxAttempts;
   private final SolrMessageProcessor messageProcessor;
 
-  private final CloudSolrClient solrClient;
+  protected final CloudSolrClient solrClient;
 
   private final ThreadPoolExecutor executor;
 
@@ -342,7 +342,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
 
 
-  void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+  protected void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
     switch (result.status()) {
       case FAILED_RESUBMIT:
         if (log.isTraceEnabled()) {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index c23b670..eb22b87 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -14,6 +14,7 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.*;
+import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
@@ -63,6 +64,11 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
    */
   private final boolean mirrorCommits;
 
+  /**
+   * Controls the processing of Delete-By-Query requests..
+   */
+  private final CrossDcConf.ExpandDbq expandDbq;
+
   private final long maxMirroringDocSizeBytes;
 
 
@@ -80,6 +86,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
                                   boolean doMirroring,
       final boolean indexUnmirrorableDocs,
       final boolean mirrorCommits,
+      final CrossDcConf.ExpandDbq expandDbq,
       final long maxMirroringBatchSizeBytes,
       final SolrParams mirroredReqParams,
       final DistributedUpdateProcessor.DistribPhase distribPhase,
@@ -89,6 +96,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     this.doMirroring = doMirroring;
     this.indexUnmirrorableDocs = indexUnmirrorableDocs;
     this.mirrorCommits = mirrorCommits;
+    this.expandDbq = expandDbq;
     this.maxMirroringDocSizeBytes = maxMirroringBatchSizeBytes;
     this.mirrorParams = mirroredReqParams;
     this.distribPhase = distribPhase;
@@ -109,7 +117,9 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
     doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
     final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
-    log.info("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+    if (log.isDebugEnabled()) {
+      log.debug("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+    }
     producerMetrics.getDocumentSize().update(estimatedDocSizeInBytes);
     final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringDocSizeBytes;
     if (tooLargeForKafka && !indexUnmirrorableDocs) {
@@ -151,8 +161,9 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
       log.debug("processAdd isLeader={} doMirroring={} tooLargeForKafka={} cmd={}", isLeader, doMirroring, tooLargeForKafka, cmd);
   }
 
-  @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
-    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+  @Override
+  public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    if (doMirroring && (expandDbq != CrossDcConf.ExpandDbq.NONE) && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
 
       CloudDescriptor cloudDesc =
           cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
@@ -165,6 +176,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
 
         String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
 
+        // TODO: implement "expand without deep paging"
         int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
         SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
         String cursorMark = CursorMarkParams.CURSOR_MARK_START;
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 250ed26..13ce938 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -218,6 +218,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         // Check if mirroring is disabled in request params, defaults to true
         boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
         boolean mirrorCommits = conf.getBool(MIRROR_COMMITS);
+        ExpandDbq expandDbq = ExpandDbq.getOrDefault(conf.get(EXPAND_DBQ), ExpandDbq.EXPAND);
         final long maxMirroringBatchSizeBytes = conf.getInt(MAX_REQUEST_SIZE_BYTES);
         Boolean indexUnmirrorableDocs = conf.getBool(INDEX_UNMIRRORABLE_DOCS);
 
@@ -250,7 +251,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
         }
 
-        return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, maxMirroringBatchSizeBytes, mirroredParams,
+        return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, expandDbq, maxMirroringBatchSizeBytes, mirroredParams,
                 DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, producerMetrics);
     }
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 140686d..b5f4bce 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -2,29 +2,40 @@ package org.apache.solr.crossdc;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer;
+import org.apache.solr.crossdc.consumer.PartitionManager;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
     QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
@@ -41,11 +52,14 @@ import java.util.Properties;
   protected static volatile MiniSolrCloudCluster solrCluster1;
   protected static volatile MiniSolrCloudCluster solrCluster2;
 
-  protected static volatile Consumer consumer = new Consumer();
+  protected static volatile Consumer consumer;
+
+  protected static volatile List<MirroredSolrRequest> requests = new ArrayList<>();
 
   private static String TOPIC = "topic1";
 
-  private static String COLLECTION = "collection1";
+  private static String COLLECTION1 = "collection1";
+  private static String COLLECTION2 = "collection2";
 
   @BeforeClass
   public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
@@ -70,8 +84,10 @@ import java.util.Properties;
 
     Properties props = new Properties();
 
-    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
-        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir())
+        .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+        .addConfig("confNoDbq", getFile("src/test/resources/configs/cloud-minimal-no-dbq/conf").toPath())
+        .configure();
 
     props.setProperty("topicName", TOPIC);
     props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
@@ -81,24 +97,34 @@ import java.util.Properties;
     byte[] data = baos.toByteArray();
     solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
 
-    CollectionAdminRequest.Create create =
-        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
-    solrCluster1.getSolrClient().request(create);
-    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+    CollectionAdminRequest.Create createSource1 =
+        CollectionAdminRequest.createCollection(COLLECTION1, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(createSource1);
+    solrCluster1.waitForActiveCollection(COLLECTION1, 1, 1);
+    CollectionAdminRequest.Create createSource2 =
+        CollectionAdminRequest.createCollection(COLLECTION2, "confNoDbq", 1, 1);
+    solrCluster1.getSolrClient().request(createSource2);
+    solrCluster1.waitForActiveCollection(COLLECTION2, 1, 1);
 
-    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION1);
 
-    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
-        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir())
+        .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+        .addConfig("confNoDbq", getFile("src/test/resources/configs/cloud-minimal-no-dbq/conf").toPath())
+        .configure();
 
     solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
 
-    CollectionAdminRequest.Create create2 =
-        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
-    solrCluster2.getSolrClient().request(create2);
-    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+    CollectionAdminRequest.Create createTarget1 =
+        CollectionAdminRequest.createCollection(COLLECTION1, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(createTarget1);
+    solrCluster2.waitForActiveCollection(COLLECTION1, 1, 1);
 
-    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+    CollectionAdminRequest.Create createTarget2 =
+        CollectionAdminRequest.createCollection(COLLECTION2, "confNoDbq", 1, 1);
+    solrCluster2.getSolrClient().request(createTarget2);
+    solrCluster2.waitForActiveCollection(COLLECTION2, 1, 1);
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION2);
 
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
@@ -108,8 +134,25 @@ import java.util.Properties;
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
     properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
-    consumer.start(properties);
 
+    consumer = new Consumer() {
+      @Override
+      protected CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
+        return new KafkaCrossDcConsumer(conf, startLatch) {
+          @Override
+          protected SolrMessageProcessor createSolrMessageProcessor() {
+            return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L) {
+              @Override
+              public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
+                requests.add(mirroredSolrRequest);
+                return super.handleItem(mirroredSolrRequest);
+              }
+            };
+          }
+        };
+      }
+    };
+    consumer.start(properties);
   }
 
   @AfterClass
@@ -146,54 +189,74 @@ import java.util.Properties;
     solrCluster2.getSolrClient().deleteByQuery("*:*");
     solrCluster1.getSolrClient().commit();
     solrCluster2.getSolrClient().commit();
+    requests.clear();
   }
 
   @Test
-  public void testDBQ() throws Exception {
+  public void testExpandDBQ() throws Exception {
 
-    CloudSolrClient client = solrCluster1.getSolrClient();
+    List<SolrInputDocument> docs = new ArrayList<>();
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField("id", String.valueOf(System.nanoTime()));
     doc.addField("text", "some test");
-    client.add(doc);
+    docs.add(doc);
 
     SolrInputDocument doc2 = new SolrInputDocument();
     doc2.addField("id", String.valueOf(System.nanoTime()));
     doc2.addField("text", "some test two");
-    client.add(doc2);
+    docs.add(doc2);
 
     SolrInputDocument doc3= new SolrInputDocument();
     doc3.addField("id", String.valueOf(System.nanoTime()));
     doc3.addField("text", "two of a kind");
-    client.add(doc3);
+    docs.add(doc3);
 
     SolrInputDocument doc4= new SolrInputDocument();
     doc4.addField("id", String.valueOf(System.nanoTime()));
     doc4.addField("text", "one two three");
-    client.add(doc4);
+    docs.add(doc4);
 
-    client.commit(COLLECTION);
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    client.add(docs);
+    client.commit(COLLECTION1);
+    // add also to the other collection
+    client.add(COLLECTION2, docs);
+    client.commit(COLLECTION2);
 
-    client.deleteByQuery("two");
+    client.deleteByQuery("text:two");
+    client.deleteByQuery(COLLECTION2, "text:two");
 
-    client.commit(COLLECTION);
+    client.commit(COLLECTION1);
+    client.commit(COLLECTION2);
 
     QueryResponse results = null;
     boolean foundUpdates = false;
     for (int i = 0; i < 50; i++) {
-      solrCluster2.getSolrClient().commit(COLLECTION);
-      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      solrCluster2.getSolrClient().commit(COLLECTION1);
+      solrCluster1.getSolrClient().query(COLLECTION1, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION1, new SolrQuery("*:*"));
       if (results.getResults().getNumFound() == 1) {
         foundUpdates = true;
       } else {
         Thread.sleep(1000);
       }
     }
-
     assertTrue("results=" + results, foundUpdates);
-    System.out.println("Rest: " + results);
-
+    assertEquals("requests=" + requests, 4, requests.size());
+    UpdateRequest ureq = (UpdateRequest) requests.get(0).getSolrRequest();
+    assertEquals("update1/col1=" + ureq, COLLECTION1, ureq.getParams().get("collection"));
+    assertEquals("update1/col1=" + ureq, 4, ureq.getDocuments().size());
+    ureq = (UpdateRequest) requests.get(1).getSolrRequest();
+    assertEquals("update1/col2=" + ureq, COLLECTION2, ureq.getParams().get("collection"));
+    assertEquals("update1/col2=" + ureq, 4, ureq.getDocuments().size());    ureq = (UpdateRequest) requests.get(1).getSolrRequest();
+    ureq = (UpdateRequest) requests.get(2).getSolrRequest();
+    assertEquals("update2/col1=" + ureq, COLLECTION1, ureq.getParams().get("collection"));
+    assertEquals("update2/col1.dbi=" + ureq, 3, ureq.getDeleteById() != null ? ureq.getDeleteById().size() : 0);
+    assertEquals("update2/col1.dbq=" + ureq, 0, ureq.getDeleteQuery() != null ? ureq.getDeleteQuery().size() : 0);
+    ureq = (UpdateRequest) requests.get(3).getSolrRequest();
+    assertEquals("update2/col2=" + ureq, COLLECTION2, ureq.getParams().get("collection"));
+    assertEquals("update2/col2.dbi=" + ureq, 0, ureq.getDeleteById() != null ? ureq.getDeleteById().size() : 0);
+    assertEquals("update2/col2.dbq=" + ureq, 1, ureq.getDeleteQuery() != null ? ureq.getDeleteQuery().size() : 0);
   }
 
 }
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index 5f81f30..8a3618f 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -17,6 +17,7 @@ import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequestBase;
 import org.apache.solr.schema.IndexSchema;
@@ -121,6 +122,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                         true,
                         true,
                         true,
+                        CrossDcConf.ExpandDbq.EXPAND,
                         1000L,
                         new ModifiableSolrParams(),
                         DistributedUpdateProcessor.DistribPhase.NONE,
@@ -253,6 +255,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                     true,
                     true,
                     true,
+                    CrossDcConf.ExpandDbq.EXPAND,
                     1000L,
                     new ModifiableSolrParams(),
                     DistributedUpdateProcessor.DistribPhase.NONE,
@@ -289,6 +292,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
                     true,
                     true,
                     false,
+                    CrossDcConf.ExpandDbq.EXPAND,
                     1000L,
                     new ModifiableSolrParams(),
                     DistributedUpdateProcessor.DistribPhase.NONE,
@@ -333,7 +337,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
 
         SolrParams mirrorParams = new ModifiableSolrParams();
         MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
-                true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
+                true, CrossDcConf.ExpandDbq.EXPAND, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
 
         assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
     }
@@ -359,6 +363,34 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
         verify(requestMirroringHandler, times(1)).mirror(any());
     }
 
+    @Test
+    public void testExpandDbq() throws Exception {
+        when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        deleteUpdateCommand.query = "id:test*";
+        UpdateRequest updateRequest = new UpdateRequest();
+        processor =
+            new MirroringUpdateProcessor(
+                next,
+                true,
+                true,
+                true,
+                CrossDcConf.ExpandDbq.NONE,
+                1000L,
+                new ModifiableSolrParams(),
+                DistributedUpdateProcessor.DistribPhase.NONE,
+                requestMirroringHandler,
+                producerMetrics) {
+                UpdateRequest createMirrorRequest() {
+                    return updateRequest;
+                }
+            };
+
+        processor.processDelete(deleteUpdateCommand);
+        verify(requestMirroringHandler, times(1)).mirror(updateRequest);
+        assertEquals("missing dbq", 1, updateRequest.getDeleteQuery().size());
+        assertEquals("dbq value", "id:test*", updateRequest.getDeleteQuery().get(0));
+    }
+
     @Test
     public void testProcessDBQResults() throws Exception {
         when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml
new file mode 100644
index 0000000..8de9263
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+  <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+    <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+      <bool name="enabled">false</bool>
+    </processor>
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml
new file mode 100644
index 0000000..c6b950e
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+  <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+    <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+      <bool name="enabled">${enabled:true}</bool>
+      <bool name="indexUnmirrorableDocs">${indexUnmirrorableDocs:false}</bool>
+      <str name="expandDbq">${expandDbq:none}</str>
+      <str name="bootstrapServers">${bootstrapServers:}</str>
+      <str name="topicName">${topicName:}</str>
+    </processor>
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>