You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ab...@apache.org on 2023/11/13 12:50:20 UTC
(solr-sandbox) branch main updated: Dead letter queue fix (#82)
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new c816e78 Dead letter queue fix (#82)
c816e78 is described below
commit c816e78f7e44b42f1bffe047b65e2c7f9b61d276
Author: Marcin Górski <ma...@gmail.com>
AuthorDate: Mon Nov 13 13:50:16 2023 +0100
Dead letter queue fix (#82)
Co-authored-by: marcingorski <ma...@lucidworks.com>
---
.../java/org/apache/solr/crossdc/common/MirroredSolrRequest.java | 6 +++++-
.../org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java | 9 +++++----
.../apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java | 3 +--
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
index b85616b..8fe1eb3 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -172,7 +172,11 @@ public class MirroredSolrRequest {
}
public MirroredSolrRequest(final Type type, final SolrRequest solrRequest) {
- this(type, 1, solrRequest, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+ this(type, 1, solrRequest);
+ }
+
+ public MirroredSolrRequest(final Type type, final int attempt, final SolrRequest solrRequest) {
+ this(type, attempt, solrRequest, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
}
public MirroredSolrRequest(final Type type, final int attempt, final SolrRequest solrRequest, final long submitTimeNanos) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 2bf9f7a..b10924d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -324,9 +324,10 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
// Kafka client is not thread-safe !!!
Future<?> future = executor.submit(() -> {
try {
- IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(type, finalSolrReqBatch));
+ final MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(type, lastRecord.value().getAttempt(), finalSolrReqBatch);
+ final IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(mirroredSolrRequest);
- processResult(lastRecord, result);
+ processResult(result);
} catch (MirroringException e) {
// We don't really know what to do here
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
@@ -339,14 +340,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
- void processResult(ConsumerRecord<String,MirroredSolrRequest> record, IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+ void processResult(IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
switch (result.status()) {
case FAILED_RESUBMIT:
if (log.isTraceEnabled()) {
log.trace("result=failed-resubmit");
}
metrics.counter("failed-resubmit").inc();
- final int attempt = record.value().getAttempt();
+ final int attempt = result.newItem().getAttempt();
if (attempt > this.maxAttempts) {
log.info("Sending message to dead letter queue because of max attempts limit with current value = {}", attempt);
kafkaMirroringSink.submitToDlq(result.newItem());
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index b726d14..ccc6a61 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -196,8 +196,7 @@ public class KafkaCrossDcConsumerTest {
consumer.kafkaMirroringSink = mockKafkaMirroringSink;
// Call the method to test
- ConsumerRecord<String, MirroredSolrRequest> record = createSampleConsumerRecord();
- consumer.processResult(record, failedResubmitResult);
+ consumer.processResult(failedResubmitResult);
// Verify that the KafkaMirroringSink.submit() method was called
verify(consumer.kafkaMirroringSink, times(1)).submit(request);