You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/13 12:50:20 UTC

(solr-sandbox) branch main updated: Dead letter queue fix (#82)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new c816e78  Dead letter queue fix (#82)
c816e78 is described below

commit c816e78f7e44b42f1bffe047b65e2c7f9b61d276
Author: Marcin Górski <ma...@gmail.com>
AuthorDate: Mon Nov 13 13:50:16 2023 +0100

    Dead letter queue fix (#82)
    
    Co-authored-by: marcingorski <ma...@lucidworks.com>
---
 .../java/org/apache/solr/crossdc/common/MirroredSolrRequest.java | 6 +++++-
 .../org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java   | 9 +++++----
 .../apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java   | 3 +--
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
index b85616b..8fe1eb3 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -172,7 +172,11 @@ public class MirroredSolrRequest {
     }
 
     public MirroredSolrRequest(final Type type, final SolrRequest solrRequest) {
-        this(type, 1, solrRequest, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+        this(type, 1, solrRequest);
+    }
+
+    public MirroredSolrRequest(final Type type, final int attempt, final SolrRequest solrRequest) {
+        this(type, attempt, solrRequest, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
     }
 
     public MirroredSolrRequest(final Type type, final int attempt, final SolrRequest solrRequest, final long submitTimeNanos) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 2bf9f7a..b10924d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -324,9 +324,10 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
     // Kafka client is not thread-safe !!!
     Future<?> future = executor.submit(() -> {
       try {
-        IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(type, finalSolrReqBatch));
+        final MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(type, lastRecord.value().getAttempt(), finalSolrReqBatch);
+        final IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(mirroredSolrRequest);
 
-        processResult(lastRecord, result);
+        processResult(result);
       } catch (MirroringException e) {
         // We don't really know what to do here
         log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
@@ -339,14 +340,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
 
 
-  void processResult(ConsumerRecord<String,MirroredSolrRequest> record, IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+  void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
     switch (result.status()) {
       case FAILED_RESUBMIT:
         if (log.isTraceEnabled()) {
           log.trace("result=failed-resubmit");
         }
         metrics.counter("failed-resubmit").inc();
-        final int attempt = record.value().getAttempt();
+        final int attempt = result.newItem().getAttempt();
         if (attempt > this.maxAttempts) {
           log.info("Sending message to dead letter queue because of max attempts limit with current value = {}", attempt);
           kafkaMirroringSink.submitToDlq(result.newItem());
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index b726d14..ccc6a61 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -196,8 +196,7 @@ public class KafkaCrossDcConsumerTest {
         consumer.kafkaMirroringSink = mockKafkaMirroringSink;
 
         // Call the method to test
-        ConsumerRecord<String, MirroredSolrRequest> record = createSampleConsumerRecord();
-        consumer.processResult(record, failedResubmitResult);
+        consumer.processResult(failedResubmitResult);
 
         // Verify that the KafkaMirroringSink.submit() method was called
         verify(consumer.kafkaMirroringSink, times(1)).submit(request);