You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/28 12:34:55 UTC
(solr-sandbox) branch main updated: Add support for optional DBQ expansion. (#91)
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new ef11a85 Add support for optional DBQ expansion. (#91)
ef11a85 is described below
commit ef11a8581fa7b877b964f79693632ac24e9bf399
Author: Andrzej BiaĆecki <ab...@apache.org>
AuthorDate: Tue Nov 28 13:34:49 2023 +0100
Add support for optional DBQ expansion. (#91)
---
CROSSDC.md | 28 ++---
.../apache/solr/crossdc/common/CrossDcConf.java | 25 ++++
.../solr/crossdc/common/KafkaCrossDcConf.java | 3 +
.../org/apache/solr/crossdc/consumer/Consumer.java | 2 +-
.../crossdc/consumer/KafkaCrossDcConsumer.java | 4 +-
.../update/processor/MirroringUpdateProcessor.java | 18 ++-
.../MirroringUpdateRequestProcessorFactory.java | 3 +-
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 127 +++++++++++++++------
.../processor/MirroringUpdateProcessorTest.java | 34 +++++-
.../configs/cloud-minimal-no-dbq/conf/schema.xml | 54 +++++++++
.../conf/solrconfig-producerdisabled.xml | 119 +++++++++++++++++++
.../cloud-minimal-no-dbq/conf/solrconfig.xml | 123 ++++++++++++++++++++
12 files changed, 487 insertions(+), 53 deletions(-)
diff --git a/CROSSDC.md b/CROSSDC.md
index 53a0930..f4a2af9 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -78,20 +78,22 @@ Add the following line to `solr.xml`:
The required configuration properties are:
- `bootstrapServers`: list of servers used to connect to the Kafka cluster
-- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on. This topic must already exist.
Optional configuration properties:
-- `batchSizeBytes`: maximum batch size in bytes for the Kafka queue
-- `bufferMemoryBytes`: memory allocated by the Producer in total for buffering
-- `lingerMs`: amount of time that the Producer will wait to add to a batch
-- `requestTimeout`: request timeout for the Producer
-- `indexUnmirrorableDocs`: if set to True, updates that are too large for the Kafka queue will still be indexed on the primary.
-- `enableDataCompression`: whether to use compression for data sent over the Kafka queue - can be none (default), gzip, snappy, lz4, or zstd
-- `numRetries`: Setting a value greater than zero will cause the Producer to resend any record whose send fails with a potentially transient error.
-- `retryBackoffMs`: The amount of time to wait before attempting to retry a failed request to a given topic partition.
-- `deliveryTimeoutMS`: Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
-- `maxRequestSizeBytes`: The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
-- `mirrorCommits`: if "true" then standalone commit requests will be mirrored, otherwise they will be processed only locally.
+- `batchSizeBytes`: (integer) maximum batch size in bytes for the Kafka queue
+- `bufferMemoryBytes`: (integer) memory allocated by the Producer in total for buffering
+- `lingerMs`: (integer) amount of time that the Producer will wait to add to a batch
+- `requestTimeout`: (integer) request timeout for the Producer
+- `indexUnmirrorableDocs`: (boolean) if set to True, updates that are too large for the Kafka queue will still be indexed on the primary.
+- `enableDataCompression`: (boolean) whether to use compression for data sent over the Kafka queue - can be none (default), gzip, snappy, lz4, or zstd
+- `numRetries`: (integer) Setting a value greater than zero will cause the Producer to resend any record whose send fails with a potentially transient error.
+- `retryBackoffMs`: (integer) The amount of time to wait before attempting to retry a failed request to a given topic partition.
+- `deliveryTimeoutMS`: (integer) Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
+- `maxRequestSizeBytes`: (integer) The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
+- `dqlTopicName`: (string) if not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
+- `mirrorCommits`: (boolean) if "true" then standalone commit requests will be mirrored, otherwise they will be processed only locally.
+- `expandDbq`: (enum) if set to "expand" (default) then Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, which may help with correct processing of out-of-order requests on the consumer side. If set to "none" then Delete-By-Query requests will be mirrored as-is.
#### CrossDC Consumer Application
@@ -130,5 +132,5 @@ To make the Cross DC UpdateProcessor optional in a common `solrconfig.xml`, use
## Limitations
-- Delete-By-Query converts to DeleteById, which can be much less efficient for queries matching large numbers of documents.
+- When `expandDbq` property is true (default) then Delete-By-Query converts to DeleteById, which can be much less efficient for queries matching large numbers of documents.
Forwarding a real Delete-By-Query could also be a reasonable option to add if it is not strictly reliant on not being reordered with other requests.
\ No newline at end of file
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index b34ae5b..4abe7a8 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -16,7 +16,32 @@
*/
package org.apache.solr.crossdc.common;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
public abstract class CrossDcConf {
public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
+ public static final String EXPAND_DBQ = "expandDbq";
+
+ public enum ExpandDbq {
+ NONE,
+ EXPAND;
+
+ private static final Map<String, ExpandDbq> valueMap = new HashMap<>();
+ static {
+ for (ExpandDbq value : values()) {
+ valueMap.put(value.name().toUpperCase(Locale.ROOT), value);
+ }
+ }
+
+ public static ExpandDbq getOrDefault(String strValue, ExpandDbq defaultValue) {
+ if (strValue == null || strValue.isBlank()) {
+ return defaultValue;
+ }
+ ExpandDbq value = valueMap.get(strValue.toUpperCase(Locale.ROOT));
+ return value != null ? value : defaultValue;
+ }
+ }
}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index eaafb0f..f94166e 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -63,6 +63,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
private static final String DEFAULT_MIRROR_COMMITS = "false";
+ private static final String DEFAULT_EXPAND_DBQ = ExpandDbq.EXPAND.name();
+
public static final String TOPIC_NAME = "topicName";
public static final String DLQ_TOPIC_NAME = "dlqTopicName";
@@ -161,6 +163,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(MIRROR_COLLECTIONS, DEFAULT_MIRROR_COLLECTIONS),
new ConfigProperty(MIRROR_COMMITS, DEFAULT_MIRROR_COMMITS),
+ new ConfigProperty(EXPAND_DBQ, DEFAULT_EXPAND_DBQ),
new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 7314226..c91ad79 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -104,7 +104,7 @@ public class Consumer {
}
}
- private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
+ protected CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
return new KafkaCrossDcConsumer(conf, startLatch);
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index b711b56..71a7f46 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -48,7 +48,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final int maxAttempts;
private final SolrMessageProcessor messageProcessor;
- private final CloudSolrClient solrClient;
+ protected final CloudSolrClient solrClient;
private final ThreadPoolExecutor executor;
@@ -342,7 +342,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
- void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+ protected void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
switch (result.status()) {
case FAILED_RESUBMIT:
if (log.isTraceEnabled()) {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index c23b670..eb22b87 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -14,6 +14,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.*;
+import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
@@ -63,6 +64,11 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
*/
private final boolean mirrorCommits;
+ /**
+ * Controls the processing of Delete-By-Query requests..
+ */
+ private final CrossDcConf.ExpandDbq expandDbq;
+
private final long maxMirroringDocSizeBytes;
@@ -80,6 +86,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
boolean doMirroring,
final boolean indexUnmirrorableDocs,
final boolean mirrorCommits,
+ final CrossDcConf.ExpandDbq expandDbq,
final long maxMirroringBatchSizeBytes,
final SolrParams mirroredReqParams,
final DistributedUpdateProcessor.DistribPhase distribPhase,
@@ -89,6 +96,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
this.doMirroring = doMirroring;
this.indexUnmirrorableDocs = indexUnmirrorableDocs;
this.mirrorCommits = mirrorCommits;
+ this.expandDbq = expandDbq;
this.maxMirroringDocSizeBytes = maxMirroringBatchSizeBytes;
this.mirrorParams = mirroredReqParams;
this.distribPhase = distribPhase;
@@ -109,7 +117,9 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
- log.info("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+ if (log.isDebugEnabled()) {
+ log.debug("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+ }
producerMetrics.getDocumentSize().update(estimatedDocSizeInBytes);
final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringDocSizeBytes;
if (tooLargeForKafka && !indexUnmirrorableDocs) {
@@ -151,8 +161,9 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
log.debug("processAdd isLeader={} doMirroring={} tooLargeForKafka={} cmd={}", isLeader, doMirroring, tooLargeForKafka, cmd);
}
- @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
- if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+ @Override
+ public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ if (doMirroring && (expandDbq != CrossDcConf.ExpandDbq.NONE) && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
CloudDescriptor cloudDesc =
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
@@ -165,6 +176,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+ // TODO: implement "expand without deep paging"
int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 250ed26..13ce938 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -218,6 +218,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
// Check if mirroring is disabled in request params, defaults to true
boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
boolean mirrorCommits = conf.getBool(MIRROR_COMMITS);
+ ExpandDbq expandDbq = ExpandDbq.getOrDefault(conf.get(EXPAND_DBQ), ExpandDbq.EXPAND);
final long maxMirroringBatchSizeBytes = conf.getInt(MAX_REQUEST_SIZE_BYTES);
Boolean indexUnmirrorableDocs = conf.getBool(INDEX_UNMIRRORABLE_DOCS);
@@ -250,7 +251,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
}
- return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, maxMirroringBatchSizeBytes, mirroredParams,
+ return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, expandDbq, maxMirroringBatchSizeBytes, mirroredParams,
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, producerMetrics);
}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 140686d..b5f4bce 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -2,29 +2,40 @@ package org.apache.solr.crossdc;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer;
+import org.apache.solr.crossdc.consumer.PartitionManager;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
@@ -41,11 +52,14 @@ import java.util.Properties;
protected static volatile MiniSolrCloudCluster solrCluster1;
protected static volatile MiniSolrCloudCluster solrCluster2;
- protected static volatile Consumer consumer = new Consumer();
+ protected static volatile Consumer consumer;
+
+ protected static volatile List<MirroredSolrRequest> requests = new ArrayList<>();
private static String TOPIC = "topic1";
- private static String COLLECTION = "collection1";
+ private static String COLLECTION1 = "collection1";
+ private static String COLLECTION2 = "collection2";
@BeforeClass
public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
@@ -70,8 +84,10 @@ import java.util.Properties;
Properties props = new Properties();
- solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
- getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+ solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir())
+ .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+ .addConfig("confNoDbq", getFile("src/test/resources/configs/cloud-minimal-no-dbq/conf").toPath())
+ .configure();
props.setProperty("topicName", TOPIC);
props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
@@ -81,24 +97,34 @@ import java.util.Properties;
byte[] data = baos.toByteArray();
solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
- CollectionAdminRequest.Create create =
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
- solrCluster1.getSolrClient().request(create);
- solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+ CollectionAdminRequest.Create createSource1 =
+ CollectionAdminRequest.createCollection(COLLECTION1, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(createSource1);
+ solrCluster1.waitForActiveCollection(COLLECTION1, 1, 1);
+ CollectionAdminRequest.Create createSource2 =
+ CollectionAdminRequest.createCollection(COLLECTION2, "confNoDbq", 1, 1);
+ solrCluster1.getSolrClient().request(createSource2);
+ solrCluster1.waitForActiveCollection(COLLECTION2, 1, 1);
- solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION1);
- solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
- getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+ solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir())
+ .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+ .addConfig("confNoDbq", getFile("src/test/resources/configs/cloud-minimal-no-dbq/conf").toPath())
+ .configure();
solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
- CollectionAdminRequest.Create create2 =
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
- solrCluster2.getSolrClient().request(create2);
- solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+ CollectionAdminRequest.Create createTarget1 =
+ CollectionAdminRequest.createCollection(COLLECTION1, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(createTarget1);
+ solrCluster2.waitForActiveCollection(COLLECTION1, 1, 1);
- solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+ CollectionAdminRequest.Create createTarget2 =
+ CollectionAdminRequest.createCollection(COLLECTION2, "confNoDbq", 1, 1);
+ solrCluster2.getSolrClient().request(createTarget2);
+ solrCluster2.waitForActiveCollection(COLLECTION2, 1, 1);
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION2);
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
@@ -108,8 +134,25 @@ import java.util.Properties;
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
- consumer.start(properties);
+ consumer = new Consumer() {
+ @Override
+ protected CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
+ return new KafkaCrossDcConsumer(conf, startLatch) {
+ @Override
+ protected SolrMessageProcessor createSolrMessageProcessor() {
+ return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L) {
+ @Override
+ public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
+ requests.add(mirroredSolrRequest);
+ return super.handleItem(mirroredSolrRequest);
+ }
+ };
+ }
+ };
+ }
+ };
+ consumer.start(properties);
}
@AfterClass
@@ -146,54 +189,74 @@ import java.util.Properties;
solrCluster2.getSolrClient().deleteByQuery("*:*");
solrCluster1.getSolrClient().commit();
solrCluster2.getSolrClient().commit();
+ requests.clear();
}
@Test
- public void testDBQ() throws Exception {
+ public void testExpandDBQ() throws Exception {
- CloudSolrClient client = solrCluster1.getSolrClient();
+ List<SolrInputDocument> docs = new ArrayList<>();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(System.nanoTime()));
doc.addField("text", "some test");
- client.add(doc);
+ docs.add(doc);
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("id", String.valueOf(System.nanoTime()));
doc2.addField("text", "some test two");
- client.add(doc2);
+ docs.add(doc2);
SolrInputDocument doc3= new SolrInputDocument();
doc3.addField("id", String.valueOf(System.nanoTime()));
doc3.addField("text", "two of a kind");
- client.add(doc3);
+ docs.add(doc3);
SolrInputDocument doc4= new SolrInputDocument();
doc4.addField("id", String.valueOf(System.nanoTime()));
doc4.addField("text", "one two three");
- client.add(doc4);
+ docs.add(doc4);
- client.commit(COLLECTION);
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ client.add(docs);
+ client.commit(COLLECTION1);
+ // add also to the other collection
+ client.add(COLLECTION2, docs);
+ client.commit(COLLECTION2);
- client.deleteByQuery("two");
+ client.deleteByQuery("text:two");
+ client.deleteByQuery(COLLECTION2, "text:two");
- client.commit(COLLECTION);
+ client.commit(COLLECTION1);
+ client.commit(COLLECTION2);
QueryResponse results = null;
boolean foundUpdates = false;
for (int i = 0; i < 50; i++) {
- solrCluster2.getSolrClient().commit(COLLECTION);
- solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
- results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ solrCluster2.getSolrClient().commit(COLLECTION1);
+ solrCluster1.getSolrClient().query(COLLECTION1, new SolrQuery("*:*"));
+ results = solrCluster2.getSolrClient().query(COLLECTION1, new SolrQuery("*:*"));
if (results.getResults().getNumFound() == 1) {
foundUpdates = true;
} else {
Thread.sleep(1000);
}
}
-
assertTrue("results=" + results, foundUpdates);
- System.out.println("Rest: " + results);
-
+ assertEquals("requests=" + requests, 4, requests.size());
+ UpdateRequest ureq = (UpdateRequest) requests.get(0).getSolrRequest();
+ assertEquals("update1/col1=" + ureq, COLLECTION1, ureq.getParams().get("collection"));
+ assertEquals("update1/col1=" + ureq, 4, ureq.getDocuments().size());
+ ureq = (UpdateRequest) requests.get(1).getSolrRequest();
+ assertEquals("update1/col2=" + ureq, COLLECTION2, ureq.getParams().get("collection"));
+ assertEquals("update1/col2=" + ureq, 4, ureq.getDocuments().size()); ureq = (UpdateRequest) requests.get(1).getSolrRequest();
+ ureq = (UpdateRequest) requests.get(2).getSolrRequest();
+ assertEquals("update2/col1=" + ureq, COLLECTION1, ureq.getParams().get("collection"));
+ assertEquals("update2/col1.dbi=" + ureq, 3, ureq.getDeleteById() != null ? ureq.getDeleteById().size() : 0);
+ assertEquals("update2/col1.dbq=" + ureq, 0, ureq.getDeleteQuery() != null ? ureq.getDeleteQuery().size() : 0);
+ ureq = (UpdateRequest) requests.get(3).getSolrRequest();
+ assertEquals("update2/col2=" + ureq, COLLECTION2, ureq.getParams().get("collection"));
+ assertEquals("update2/col2.dbi=" + ureq, 0, ureq.getDeleteById() != null ? ureq.getDeleteById().size() : 0);
+ assertEquals("update2/col2.dbq=" + ureq, 1, ureq.getDeleteQuery() != null ? ureq.getDeleteQuery().size() : 0);
}
}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index 5f81f30..8a3618f 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -17,6 +17,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.schema.IndexSchema;
@@ -121,6 +122,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
true,
true,
true,
+ CrossDcConf.ExpandDbq.EXPAND,
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
@@ -253,6 +255,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
true,
true,
true,
+ CrossDcConf.ExpandDbq.EXPAND,
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
@@ -289,6 +292,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
true,
true,
false,
+ CrossDcConf.ExpandDbq.EXPAND,
1000L,
new ModifiableSolrParams(),
DistributedUpdateProcessor.DistribPhase.NONE,
@@ -333,7 +337,7 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
SolrParams mirrorParams = new ModifiableSolrParams();
MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
- true, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
+ true, CrossDcConf.ExpandDbq.EXPAND, 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler, producerMetrics);
assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
}
@@ -359,6 +363,34 @@ public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
verify(requestMirroringHandler, times(1)).mirror(any());
}
+ @Test
+ public void testExpandDbq() throws Exception {
+ when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ deleteUpdateCommand.query = "id:test*";
+ UpdateRequest updateRequest = new UpdateRequest();
+ processor =
+ new MirroringUpdateProcessor(
+ next,
+ true,
+ true,
+ true,
+ CrossDcConf.ExpandDbq.NONE,
+ 1000L,
+ new ModifiableSolrParams(),
+ DistributedUpdateProcessor.DistribPhase.NONE,
+ requestMirroringHandler,
+ producerMetrics) {
+ UpdateRequest createMirrorRequest() {
+ return updateRequest;
+ }
+ };
+
+ processor.processDelete(deleteUpdateCommand);
+ verify(requestMirroringHandler, times(1)).mirror(updateRequest);
+ assertEquals("missing dbq", 1, updateRequest.getDeleteQuery().size());
+ assertEquals("dbq value", "id:test*", updateRequest.getDeleteQuery().get(0));
+ }
+
@Test
public void testProcessDBQResults() throws Exception {
when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+ <fieldType name="string" class="solr.StrField" docValues="true"/>
+ <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="text" class="solr.TextField">
+ <analyzer>
+ <tokenizer class="solr.StandardTokenizerFactory"/>
+ <filter class="solr.LowerCaseFilterFactory"/>
+ </analyzer>
+ </fieldType>
+
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="text" type="text" indexed="true" stored="false"/>
+
+ <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+ <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+ <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+ <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+ <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+ <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+ <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+ <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml
new file mode 100644
index 0000000..8de9263
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig-producerdisabled.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <indexConfig>
+ <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+ <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+ <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+ <double name="noCFSRatio">${noCFSRatio:.1}</double>
+ </mergePolicyFactory>
+
+ <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+ <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+ <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs> <!-- Force the common case to flush by doc count -->
+ <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+ <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+ </mergeScheduler> -->
+
+ <writeLockTimeout>1000</writeLockTimeout>
+ <commitLockTimeout>10000</commitLockTimeout>
+
+ <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+ use the single process lockType for speed - but tests that explicitly need
+ to vary the lockType canset it as needed.
+ -->
+ <lockType>${lockType:single}</lockType>
+
+ <infoStream>${infostream:false}</infoStream>
+
+ </indexConfig>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <autoCommit>
+ <maxTime>${autoCommit.maxTime:60000}</maxTime>
+ </autoCommit>
+ <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+
+ <query>
+ <queryResultCache
+ enabled="${queryResultCache.enabled:false}"
+ class="${queryResultCache.class:solr.CaffeineCache}"
+ size="${queryResultCache.size:0}"
+ initialSize="${queryResultCache.initialSize:0}"
+ autowarmCount="${queryResultCache.autowarmCount:0}"/>
+ <documentCache
+ enabled="${documentCache.enabled:false}"
+ class="${documentCache.class:solr.CaffeineCache}"
+ size="${documentCache.size:0}"
+ initialSize="${documentCache.initialSize:0}"
+ autowarmCount="${documentCache.autowarmCount:0}"/>
+ <filterCache
+ enabled ="${filterCache.enabled:false}"
+ class="${filterCache.class:solr.CaffeineCache}"
+ size="${filterCache.size:1}"
+ initialSize="${filterCache.initialSize:1}"
+ autowarmCount="${filterCache.autowarmCount:0}"
+ async="${filterCache.async:false}"/>
+ <cache name="myPerSegmentCache"
+ enabled="${myPerSegmentCache.enabled:false}"
+ class="${myPerSegmentCache.class:solr.CaffeineCache}"
+ size="${myPerSegmentCache.size:0}"
+ initialSize="${myPerSegmentCache.initialSize:0}"
+ autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+ </query>
+
+ <updateRequestProcessorChain name="mirrorUpdateChain" default="true">
+ <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+ <bool name="enabled">false</bool>
+ </processor>
+ <processor class="solr.LogUpdateProcessorFactory" />
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+</config>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml
new file mode 100644
index 0000000..c6b950e
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal-no-dbq/conf/solrconfig.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <indexConfig>
+ <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+ <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+ <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+ <double name="noCFSRatio">${noCFSRatio:.1}</double>
+ </mergePolicyFactory>
+
+ <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+ <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+ <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs> <!-- Force the common case to flush by doc count -->
+ <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+ <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+ </mergeScheduler> -->
+
+ <writeLockTimeout>1000</writeLockTimeout>
+ <commitLockTimeout>10000</commitLockTimeout>
+
+ <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+ use the single process lockType for speed - but tests that explicitly need
+ to vary the lockType canset it as needed.
+ -->
+ <lockType>${lockType:single}</lockType>
+
+ <infoStream>${infostream:false}</infoStream>
+
+ </indexConfig>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <autoCommit>
+ <maxTime>${autoCommit.maxTime:60000}</maxTime>
+ </autoCommit>
+ <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+
+ <query>
+ <queryResultCache
+ enabled="${queryResultCache.enabled:false}"
+ class="${queryResultCache.class:solr.CaffeineCache}"
+ size="${queryResultCache.size:0}"
+ initialSize="${queryResultCache.initialSize:0}"
+ autowarmCount="${queryResultCache.autowarmCount:0}"/>
+ <documentCache
+ enabled="${documentCache.enabled:false}"
+ class="${documentCache.class:solr.CaffeineCache}"
+ size="${documentCache.size:0}"
+ initialSize="${documentCache.initialSize:0}"
+ autowarmCount="${documentCache.autowarmCount:0}"/>
+ <filterCache
+ enabled ="${filterCache.enabled:false}"
+ class="${filterCache.class:solr.CaffeineCache}"
+ size="${filterCache.size:1}"
+ initialSize="${filterCache.initialSize:1}"
+ autowarmCount="${filterCache.autowarmCount:0}"
+ async="${filterCache.async:false}"/>
+ <cache name="myPerSegmentCache"
+ enabled="${myPerSegmentCache.enabled:false}"
+ class="${myPerSegmentCache.class:solr.CaffeineCache}"
+ size="${myPerSegmentCache.size:0}"
+ initialSize="${myPerSegmentCache.initialSize:0}"
+ autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+ </query>
+
+ <updateRequestProcessorChain name="mirrorUpdateChain" default="true">
+ <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+ <bool name="enabled">${enabled:true}</bool>
+ <bool name="indexUnmirrorableDocs">${indexUnmirrorableDocs:false}</bool>
+ <str name="expandDbq">${expandDbq:none}</str>
+ <str name="bootstrapServers">${bootstrapServers:}</str>
+ <str name="topicName">${topicName:}</str>
+ </processor>
+ <processor class="solr.LogUpdateProcessorFactory" />
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+
+</config>