You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/06 13:40:59 UTC
(solr-sandbox) branch main updated: Fix missing support for other request types. (#81)
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 9575cd0 Fix missing support for other request types. (#81)
9575cd0 is described below
commit 9575cd04018aecd18e269f7a61fd8178f436e4a2
Author: Andrzej BiaĆecki <ab...@apache.org>
AuthorDate: Mon Nov 6 14:40:54 2023 +0100
Fix missing support for other request types. (#81)
---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 83 +++++++++++++---------
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 46 +++++++++---
2 files changed, 85 insertions(+), 44 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 63939ff..2bf9f7a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -7,6 +7,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
@@ -191,7 +192,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
log.trace("poll return {} records", records.count());
}
- UpdateRequest solrReqBatch = null;
+ UpdateRequest updateReqBatch = null;
ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
@@ -203,9 +204,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
try {
- ModifiableSolrParams lastParams = null;
- NamedList lastParamsAsNamedList = null;
- solrReqBatch = new UpdateRequest();
+ ModifiableSolrParams lastUpdateParams = null;
+ NamedList lastUpdateParamsAsNamedList = null;
for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : partitionRecords) {
if (log.isTraceEnabled()) {
log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(),
@@ -214,48 +214,62 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
lastRecord = requestRecord;
MirroredSolrRequest req = requestRecord.value();
- UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
- ModifiableSolrParams params = solrReq.getParams();
+ SolrRequest solrReq = req.getSolrRequest();
+ MirroredSolrRequest.Type type = req.getType();
+ ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
if (log.isTraceEnabled()) {
- log.trace("params={}", params);
+ log.trace("-- picked type={}, params={}", req.getType(), params);
}
- if (lastParams != null && !lastParams.toNamedList().equals(params.toNamedList())) {
+ // it's an update but with different params
+ if (type == MirroredSolrRequest.Type.UPDATE && lastUpdateParams != null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) {
if (log.isTraceEnabled()) {
log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
}
- lastParamsAsNamedList = null;
- sendBatch(solrReqBatch, lastRecord, workUnit);
- solrReqBatch = new UpdateRequest();
+ // send previous batch, if any
+ sendBatch(updateReqBatch, type, lastRecord, workUnit);
+ updateReqBatch = new UpdateRequest();
+ lastUpdateParamsAsNamedList = null;
workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
}
- lastParams = solrReq.getParams();
- solrReqBatch.setParams(params);
- if (lastParamsAsNamedList == null) {
- lastParamsAsNamedList = lastParams.toNamedList();
- }
-
- List<SolrInputDocument> docs = solrReq.getDocuments();
- if (docs != null) {
- solrReqBatch.add(docs);
- }
- List<String> deletes = solrReq.getDeleteById();
- if (deletes != null) {
- solrReqBatch.deleteById(deletes);
- }
- List<String> deleteByQuery = solrReq.getDeleteQuery();
- if (deleteByQuery != null) {
- for (String delByQuery : deleteByQuery) {
- solrReqBatch.deleteByQuery(delByQuery);
+ lastUpdateParams = params;
+ if (type == MirroredSolrRequest.Type.UPDATE) {
+ if (updateReqBatch == null) {
+ // just initialize
+ updateReqBatch = new UpdateRequest();
+ }
+ UpdateRequest update = (UpdateRequest) solrReq;
+ MirroredSolrRequest.setParams(updateReqBatch, params);
+ if (lastUpdateParamsAsNamedList == null) {
+ lastUpdateParamsAsNamedList = lastUpdateParams.toNamedList();
}
+ // merge
+ List<SolrInputDocument> docs = update.getDocuments();
+ if (docs != null) {
+ updateReqBatch.add(docs);
+ }
+ List<String> deletes = update.getDeleteById();
+ if (deletes != null) {
+ updateReqBatch.deleteById(deletes);
+ }
+ List<String> deleteByQuery = update.getDeleteQuery();
+ if (deleteByQuery != null) {
+ for (String delByQuery : deleteByQuery) {
+ updateReqBatch.deleteByQuery(delByQuery);
+ }
+ }
+ } else {
+ // non-update requests should be sent immediately
+ sendBatch(req.getSolrRequest(), type, lastRecord, workUnit);
}
-
}
- sendBatch(solrReqBatch, lastRecord, workUnit);
+ if (updateReqBatch != null) {
+ sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
+ }
try {
partitionManager.checkForOffsetUpdates(partition);
} catch (Throwable e) {
@@ -305,11 +319,12 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
return true;
}
- public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
- UpdateRequest finalSolrReqBatch = solrReqBatch;
+ public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type type, ConsumerRecord<String, MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
+ SolrRequest finalSolrReqBatch = solrReqBatch;
+ // Kafka client is not thread-safe !!!
Future<?> future = executor.submit(() -> {
try {
- IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+ IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(type, finalSolrReqBatch));
processResult(lastRecord, result);
} catch (MirroringException e) {
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index ab9e3d0..b726d14 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -5,10 +5,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -179,7 +182,7 @@ public class KafkaCrossDcConsumerTest {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
KafkaCrossDcConsumer consumer = createCrossDcConsumerSpy(mockConsumer);
- doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
+ doNothing().when(consumer).sendBatch(any(UpdateRequest.class), eq(MirroredSolrRequest.Type.UPDATE), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
// Set up the SolrMessageProcessor mock
SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
@@ -213,7 +216,7 @@ public class KafkaCrossDcConsumerTest {
public void testHandleValidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
-
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "1");
UpdateRequest validRequest = new UpdateRequest();
@@ -228,17 +231,39 @@ public class KafkaCrossDcConsumerTest {
spyConsumer.run();
// Verify that the valid MirroredSolrRequest was processed.
- verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+ verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
// Check if the UpdateRequest has the same content as the original validRequest
- return updateRequest.getDocuments().equals(validRequest.getDocuments()) &&
- updateRequest.getParams().equals(validRequest.getParams());
- }), eq(record), any());
+ return ((UpdateRequest) solrRequest).getDocuments().equals(validRequest.getDocuments()) &&
+ solrRequest.getParams().toNamedList().equals(validRequest.getParams().toNamedList());
+ }), eq(MirroredSolrRequest.Type.UPDATE), eq(record), any());
+ }
+
+ @Test
+ public void testHandleValidAdminRequest() throws Exception {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("test", "testConfig", 2, 2);
+
+ ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
+ ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+ when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+ spyConsumer.run();
+
+ // Verify that the valid MirroredSolrRequest was processed.
+ verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
+ // Check if the SolrRequest has the same content as the original validRequest
+ return solrRequest.getParams().toNamedList().equals(create.getParams().toNamedList());
+ }), eq(MirroredSolrRequest.Type.ADMIN), eq(record), any());
}
@Test
public void testHandleInvalidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class);
+ doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(mockSolrMessageProcessor).handleItem(any());
KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
@Override
public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
@@ -269,11 +294,12 @@ public class KafkaCrossDcConsumerTest {
spyConsumer.run();
// Verify that the valid MirroredSolrRequest was processed.
- verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+ verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
+ System.out.println(Utils.toJSONString(solrRequest));
// Check if the UpdateRequest has the same content as the original invalidRequest
- return updateRequest.getDocuments() == null &&
- updateRequest.getParams().equals(invalidRequest.getParams());
- }), eq(record), any());
+ return ((UpdateRequest) solrRequest).getDocuments() == null &&
+ solrRequest.getParams().toNamedList().equals(invalidRequest.getParams().toNamedList());
+ }), any(), eq(record), any());
}
@Test