You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/16 12:48:11 UTC

(solr-sandbox) branch main updated: Fix KafkaCrossDcConsumer NPE, add unit test. (#88)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 0377460  Fix KafkaCrossDcConsumer NPE, add unit test. (#88)
0377460 is described below

commit 03774606768d306897893b664ee9e649a5d76419
Author: Andrzej BiaƂecki <ab...@apache.org>
AuthorDate: Thu Nov 16 13:48:06 2023 +0100

    Fix KafkaCrossDcConsumer NPE, add unit test. (#88)
---
 .../org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java    | 4 +++-
 .../apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java    | 8 +++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index b10924d..b711b56 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -227,7 +227,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
                 log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
               }
               // send previous batch, if any
-              sendBatch(updateReqBatch, type, lastRecord, workUnit);
+              if (updateReqBatch != null) {
+                sendBatch(updateReqBatch, type, lastRecord, workUnit);
+              }
               updateReqBatch = new UpdateRequest();
               lastUpdateParamsAsNamedList = null;
               workUnit = new PartitionManager.WorkUnit(partition);
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index ccc6a61..1dbc431 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -244,8 +244,9 @@ public class KafkaCrossDcConsumerTest {
         doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
         CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("test", "testConfig", 2, 2);
 
-        ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
-        ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+        ConsumerRecord<String, MirroredSolrRequest> record1 = new ConsumerRecord<>("test-topic", 0, 0, "key1", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
+        ConsumerRecord<String, MirroredSolrRequest> record2 = new ConsumerRecord<>("test-topic", 0, 1, "key2", new MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, new UpdateRequest()));
+        ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record1, record2)));
 
         when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
 
@@ -255,7 +256,8 @@ public class KafkaCrossDcConsumerTest {
         verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
             // Check if the SolrRequest has the same content as the original validRequest
             return solrRequest.getParams().toNamedList().equals(create.getParams().toNamedList());
-        }), eq(MirroredSolrRequest.Type.ADMIN), eq(record), any());
+        }), eq(MirroredSolrRequest.Type.ADMIN), eq(record1), any());
+        verify(spyConsumer, times(1)).sendBatch(any(), eq(MirroredSolrRequest.Type.UPDATE), eq(record2), any());
     }
 
     @Test