You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/06 13:40:59 UTC

(solr-sandbox) branch main updated: Fix missing support for other request types. (#81)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 9575cd0  Fix missing support for other request types. (#81)
9575cd0 is described below

commit 9575cd04018aecd18e269f7a61fd8178f436e4a2
Author: Andrzej BiaƂecki <ab...@apache.org>
AuthorDate: Mon Nov 6 14:40:54 2023 +0100

    Fix missing support for other request types. (#81)
---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 83 +++++++++++++---------
 .../crossdc/consumer/KafkaCrossDcConsumerTest.java | 46 +++++++++---
 2 files changed, 85 insertions(+), 44 deletions(-)

diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 63939ff..2bf9f7a 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -7,6 +7,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -191,7 +192,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
         log.trace("poll return {} records", records.count());
       }
 
-      UpdateRequest solrReqBatch = null;
+      UpdateRequest updateReqBatch = null;
 
       ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
 
@@ -203,9 +204,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
         workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
         partitionWork.partitionQueue.add(workUnit);
         try {
-          ModifiableSolrParams lastParams = null;
-          NamedList lastParamsAsNamedList = null;
-          solrReqBatch = new UpdateRequest();
+          ModifiableSolrParams lastUpdateParams = null;
+          NamedList lastUpdateParamsAsNamedList = null;
           for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : partitionRecords) {
             if (log.isTraceEnabled()) {
               log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(),
@@ -214,48 +214,62 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
             lastRecord = requestRecord;
             MirroredSolrRequest req = requestRecord.value();
-            UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
-            ModifiableSolrParams params = solrReq.getParams();
+            SolrRequest solrReq = req.getSolrRequest();
+            MirroredSolrRequest.Type type = req.getType();
+            ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
             if (log.isTraceEnabled()) {
-              log.trace("params={}", params);
+              log.trace("-- picked type={}, params={}", req.getType(), params);
             }
 
-            if (lastParams != null && !lastParams.toNamedList().equals(params.toNamedList())) {
+            // it's an update but with different params
+            if (type == MirroredSolrRequest.Type.UPDATE && lastUpdateParams != null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) {
               if (log.isTraceEnabled()) {
                 log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
               }
-              lastParamsAsNamedList = null;
-              sendBatch(solrReqBatch, lastRecord, workUnit);
-              solrReqBatch = new UpdateRequest();
+              // send previous batch, if any
+              sendBatch(updateReqBatch, type, lastRecord, workUnit);
+              updateReqBatch = new UpdateRequest();
+              lastUpdateParamsAsNamedList = null;
               workUnit = new PartitionManager.WorkUnit(partition);
               workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
               partitionWork.partitionQueue.add(workUnit);
             }
 
-            lastParams = solrReq.getParams();
-            solrReqBatch.setParams(params);
-            if (lastParamsAsNamedList == null) {
-              lastParamsAsNamedList = lastParams.toNamedList();
-            }
-
-            List<SolrInputDocument> docs = solrReq.getDocuments();
-            if (docs != null) {
-              solrReqBatch.add(docs);
-            }
-            List<String> deletes = solrReq.getDeleteById();
-            if (deletes != null) {
-              solrReqBatch.deleteById(deletes);
-            }
-            List<String> deleteByQuery = solrReq.getDeleteQuery();
-            if (deleteByQuery != null) {
-              for (String delByQuery : deleteByQuery) {
-                solrReqBatch.deleteByQuery(delByQuery);
+            lastUpdateParams = params;
+            if (type == MirroredSolrRequest.Type.UPDATE) {
+              if (updateReqBatch == null) {
+                // just initialize
+                updateReqBatch = new UpdateRequest();
+              }
+              UpdateRequest update = (UpdateRequest) solrReq;
+              MirroredSolrRequest.setParams(updateReqBatch, params);
+              if (lastUpdateParamsAsNamedList == null) {
+                lastUpdateParamsAsNamedList = lastUpdateParams.toNamedList();
               }
+              // merge
+              List<SolrInputDocument> docs = update.getDocuments();
+              if (docs != null) {
+                updateReqBatch.add(docs);
+              }
+              List<String> deletes = update.getDeleteById();
+              if (deletes != null) {
+                updateReqBatch.deleteById(deletes);
+              }
+              List<String> deleteByQuery = update.getDeleteQuery();
+              if (deleteByQuery != null) {
+                for (String delByQuery : deleteByQuery) {
+                  updateReqBatch.deleteByQuery(delByQuery);
+                }
+              }
+            } else {
+              // non-update requests should be sent immediately
+              sendBatch(req.getSolrRequest(), type, lastRecord, workUnit);
             }
-
           }
 
-          sendBatch(solrReqBatch, lastRecord, workUnit);
+          if (updateReqBatch != null) {
+            sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
+          }
           try {
             partitionManager.checkForOffsetUpdates(partition);
           } catch (Throwable e) {
@@ -305,11 +319,12 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
     return true;
   }
 
-  public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
-    UpdateRequest finalSolrReqBatch = solrReqBatch;
+  public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type type, ConsumerRecord<String, MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
+    SolrRequest finalSolrReqBatch = solrReqBatch;
+    // Kafka client is not thread-safe !!!
     Future<?> future = executor.submit(() -> {
       try {
-        IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+        IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(type, finalSolrReqBatch));
 
         processResult(lastRecord, result);
       } catch (MirroringException e) {
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index ab9e3d0..b726d14 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -5,10 +5,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -179,7 +182,7 @@ public class KafkaCrossDcConsumerTest {
         KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
         KafkaCrossDcConsumer consumer = createCrossDcConsumerSpy(mockConsumer);
 
-        doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
+        doNothing().when(consumer).sendBatch(any(UpdateRequest.class), eq(MirroredSolrRequest.Type.UPDATE), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
 
         // Set up the SolrMessageProcessor mock
         SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
@@ -213,7 +216,7 @@ public class KafkaCrossDcConsumerTest {
     public void testHandleValidMirroredSolrRequest() {
         KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
         KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
-
+        doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
         SolrInputDocument doc = new SolrInputDocument();
         doc.addField("id", "1");
         UpdateRequest validRequest = new UpdateRequest();
@@ -228,17 +231,39 @@ public class KafkaCrossDcConsumerTest {
         spyConsumer.run();
 
         // Verify that the valid MirroredSolrRequest was processed.
-        verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+        verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
             // Check if the UpdateRequest has the same content as the original validRequest
-            return updateRequest.getDocuments().equals(validRequest.getDocuments()) &&
-                    updateRequest.getParams().equals(validRequest.getParams());
-        }), eq(record), any());
+            return ((UpdateRequest) solrRequest).getDocuments().equals(validRequest.getDocuments()) &&
+                    solrRequest.getParams().toNamedList().equals(validRequest.getParams().toNamedList());
+        }), eq(MirroredSolrRequest.Type.UPDATE), eq(record), any());
+    }
+
+    @Test
+    public void testHandleValidAdminRequest() throws Exception {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+        KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
+        doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
+        CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("test", "testConfig", 2, 2);
+
+        ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
+        ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+        when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+        spyConsumer.run();
+
+        // Verify that the valid MirroredSolrRequest was processed.
+        verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
+            // Check if the SolrRequest has the same content as the original validRequest
+            return solrRequest.getParams().toNamedList().equals(create.getParams().toNamedList());
+        }), eq(MirroredSolrRequest.Type.ADMIN), eq(record), any());
     }
 
     @Test
     public void testHandleInvalidMirroredSolrRequest() {
         KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
         SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class);
+        doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(mockSolrMessageProcessor).handleItem(any());
         KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
             @Override
             public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
@@ -269,11 +294,12 @@ public class KafkaCrossDcConsumerTest {
         spyConsumer.run();
 
         // Verify that the valid MirroredSolrRequest was processed.
-        verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+        verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
+            System.out.println(Utils.toJSONString(solrRequest));
             // Check if the UpdateRequest has the same content as the original invalidRequest
-            return updateRequest.getDocuments() == null &&
-                    updateRequest.getParams().equals(invalidRequest.getParams());
-        }), eq(record), any());
+            return ((UpdateRequest) solrRequest).getDocuments() == null &&
+                solrRequest.getParams().toNamedList().equals(invalidRequest.getParams().toNamedList());
+        }), any(), eq(record), any());
     }
 
     @Test