You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/16 12:48:11 UTC
(solr-sandbox) branch main updated: Fix KafkaCrossDcConsumer NPE, add unit test. (#88)
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 0377460 Fix KafkaCrossDcConsumer NPE, add unit test. (#88)
0377460 is described below
commit 03774606768d306897893b664ee9e649a5d76419
Author: Andrzej BiaĆecki <ab...@apache.org>
AuthorDate: Thu Nov 16 13:48:06 2023 +0100
Fix KafkaCrossDcConsumer NPE, add unit test. (#88)
---
.../org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java | 4 +++-
.../apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java | 8 +++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index b10924d..b711b56 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -227,7 +227,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
}
// send previous batch, if any
- sendBatch(updateReqBatch, type, lastRecord, workUnit);
+ if (updateReqBatch != null) {
+ sendBatch(updateReqBatch, type, lastRecord, workUnit);
+ }
updateReqBatch = new UpdateRequest();
lastUpdateParamsAsNamedList = null;
workUnit = new PartitionManager.WorkUnit(partition);
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index ccc6a61..1dbc431 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -244,8 +244,9 @@ public class KafkaCrossDcConsumerTest {
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)).when(messageProcessorMock).handleItem(any());
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("test", "testConfig", 2, 2);
- ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
- ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+ ConsumerRecord<String, MirroredSolrRequest> record1 = new ConsumerRecord<>("test-topic", 0, 0, "key1", new MirroredSolrRequest(MirroredSolrRequest.Type.ADMIN, create));
+ ConsumerRecord<String, MirroredSolrRequest> record2 = new ConsumerRecord<>("test-topic", 0, 1, "key2", new MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, new UpdateRequest()));
+ ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record1, record2)));
when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
@@ -255,7 +256,8 @@ public class KafkaCrossDcConsumerTest {
verify(spyConsumer, times(1)).sendBatch(argThat(solrRequest -> {
// Check if the SolrRequest has the same content as the original validRequest
return solrRequest.getParams().toNamedList().equals(create.getParams().toNamedList());
- }), eq(MirroredSolrRequest.Type.ADMIN), eq(record), any());
+ }), eq(MirroredSolrRequest.Type.ADMIN), eq(record1), any());
+ verify(spyConsumer, times(1)).sendBatch(any(), eq(MirroredSolrRequest.Type.UPDATE), eq(record2), any());
}
@Test