You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/09 14:11:55 UTC
(camel) branch camel-3.21.x updated: remove lastResult from Kafka Camel component (#12002)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push:
new f1cc4f9fb39 remove lastResult from Kafka Camel component (#12002)
f1cc4f9fb39 is described below
commit f1cc4f9fb3981035bfab37830600c29f8e364779
Author: Mike Barlotta <Co...@users.noreply.github.com>
AuthorDate: Tue Jan 9 09:11:48 2024 -0500
remove lastResult from Kafka Camel component (#12002)
---
.../camel/component/kafka/KafkaFetchRecords.java | 46 ++++-----------------
.../consumer/support/KafkaRecordProcessor.java | 48 +++++++++-------------
.../support/KafkaRecordProcessorFacade.java | 38 +++++++----------
.../kafka/consumer/support/ProcessingResult.java | 20 +--------
4 files changed, 44 insertions(+), 108 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index e13f7d86e99..fe9b3bdf9ee 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -297,8 +297,7 @@ public class KafkaFetchRecords implements Runnable {
}
protected void startPolling() {
- long partitionLastOffset = -1;
-
+
try {
/*
* We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -307,7 +306,8 @@ public class KafkaFetchRecords implements Runnable {
lock.lock();
long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-
+ Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+
if (LOG.isTraceEnabled()) {
LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
}
@@ -315,10 +315,6 @@ public class KafkaFetchRecords implements Runnable {
KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
kafkaConsumer, threadId, commitManager, consumerListener);
- Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-
- ProcessingResult lastResult = null;
-
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
if (consumerListener != null) {
@@ -327,43 +323,15 @@ public class KafkaFetchRecords implements Runnable {
}
}
- if (lastResult != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("This polling iteration is using lastresult on partition {} and offset {}",
- lastResult.getPartition(), lastResult.getPartitionLastOffset());
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("This polling iteration is using lastresult of null");
- }
- }
-
- ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult);
-
- if (result != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("This polling iteration had a result returned for partition {} and offset {}",
- result.getPartition(), result.getPartitionLastOffset());
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("This polling iteration had a result returned as null");
- }
- }
-
+ ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
updateTaskState();
+
+ // when breakOnFirstError we want to unsubscribe from Kafka
if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
LOG.debug("We hit an error ... setting flags to force reconnect");
// force re-connect
setReconnect(true);
setConnected(false);
- } else {
- lastResult = result;
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Setting lastresult to partition {} and offset {}",
- lastResult.getPartition(), lastResult.getPartitionLastOffset());
- }
}
}
@@ -397,6 +365,8 @@ public class KafkaFetchRecords implements Runnable {
e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage());
}
+ // why do we set this to -1
+ long partitionLastOffset = -1;
pollExceptionStrategy.handle(partitionLastOffset, e);
} finally {
// only close if not retry
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index e731e4c4763..1299bdc0ba8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -86,8 +85,7 @@ public class KafkaRecordProcessor {
public ProcessingResult processExchange(
Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
- boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
- ExceptionHandler exceptionHandler) {
+ boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
Message message = exchange.getMessage();
@@ -114,33 +112,31 @@ public class KafkaRecordProcessor {
} catch (Exception e) {
exchange.setException(e);
}
+
+ ProcessingResult result = ProcessingResult.newUnprocessed();
if (exchange.getException() != null) {
LOG.debug("An exception was thrown for record at partition {} and offset {}",
record.partition(), record.offset());
- boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult,
- exceptionHandler);
- return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
+ boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+ result = new ProcessingResult(breakOnErrorExit, true);
} else {
- return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null);
+ result = new ProcessingResult(false, exchange.getException() != null);
}
+
+ if (!result.isBreakOnErrorHit()) {
+ commitManager.recordOffset(topicPartition, record.offset());
+ }
+
+ return result;
}
private boolean processException(
Exchange exchange, TopicPartition topicPartition,
- ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
- ExceptionHandler exceptionHandler) {
+ ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
// processing failed due to an unhandled exception, what should we do
if (configuration.isBreakOnFirstError()) {
- if (lastResult.getPartition() != -1 &&
- lastResult.getPartition() != record.partition()) {
- LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " +
- " The last result was on partition {} with offset {} but was expecting partition {} with offset {}",
- topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(),
- record.partition(), record.offset());
- }
-
// we are failing and we should break out
if (LOG.isWarnEnabled()) {
Exception exc = exchange.getException();
@@ -150,17 +146,13 @@ public class KafkaRecordProcessor {
record.offset(), record.partition());
}
- // force commit, so we resume on next poll where we failed
- // except when the failure happened at the first message in a poll
- if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) {
- // we should just do a commit (vs the original forceCommit)
- // when route uses NOOP Commit Manager it will rely
- // on the route implementation to explicitly commit offset
- // when route uses Synch/Asynch Commit Manager it will
- // ALWAYS commit the offset for the failing record
- // and will ALWAYS retry it
- commitManager.commit(topicPartition);
- }
+ // we should just do a commit (vs the original forceCommit)
+ // when route uses NOOP Commit Manager it will rely
+ // on the route implementation to explicitly commit offset
+ // when route uses Synch/Asynch Commit Manager it will
+ // ALWAYS commit the offset for the failing record
+ // and will ALWAYS retry it
+ commitManager.commit(topicPartition);
// continue to next partition
return true;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 44573daa60d..b114c3ff8a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade {
return camelKafkaConsumer.isStopping();
}
- public ProcessingResult processPolledRecords(
- ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) {
+ public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
logRecords(allRecords);
+
+ ProcessingResult result = ProcessingResult.newUnprocessed();
Set<TopicPartition> partitions = allRecords.partitions();
Iterator<TopicPartition> partitionIterator = partitions.iterator();
LOG.debug("Poll received records on {} partitions", partitions.size());
- ProcessingResult lastResult
- = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
-
while (partitionIterator.hasNext() && !isStopping()) {
TopicPartition partition = partitionIterator.next();
@@ -76,34 +74,32 @@ public class KafkaRecordProcessorFacade {
logRecordsInPartition(partitionRecords, partition);
- while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
+ while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
ConsumerRecord<Object, Object> record = recordIterator.next();
LOG.debug("Processing record on partition {} with offset {}", record.partition(), record.offset());
- lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+ result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
kafkaRecordProcessor, record);
- LOG.debug(
- "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}",
- record.partition(), record.offset(), lastResult.getPartition(), lastResult.getPartitionLastOffset());
+ LOG.debug("Processed record on partition {} with offset {}", record.partition(), record.offset());
if (consumerListener != null) {
- if (!consumerListener.afterProcess(lastResult)) {
+ if (!consumerListener.afterProcess(result)) {
commitManager.commit(partition);
- return lastResult;
+ return result;
}
}
}
- if (!lastResult.isBreakOnErrorHit()) {
+ if (!result.isBreakOnErrorHit()) {
LOG.debug("Committing offset on successful execution");
// all records processed from partition so commit them
commitManager.commit(partition);
}
}
- return lastResult;
+ return result;
}
private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
@@ -123,7 +119,6 @@ public class KafkaRecordProcessorFacade {
TopicPartition partition,
boolean partitionHasNext,
boolean recordHasNext,
- final ProcessingResult lastResult,
KafkaRecordProcessor kafkaRecordProcessor,
ConsumerRecord<Object, Object> record) {
@@ -131,18 +126,13 @@ public class KafkaRecordProcessorFacade {
Exchange exchange = camelKafkaConsumer.createExchange(false);
- ProcessingResult currentResult
- = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
- recordHasNext, record, lastResult, camelKafkaConsumer.getExceptionHandler());
-
- if (!currentResult.isBreakOnErrorHit()) {
- commitManager.recordOffset(partition, currentResult.getPartitionLastOffset());
- }
-
+ ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
+ recordHasNext, record, camelKafkaConsumer.getExceptionHandler());
+
// success so release the exchange
camelKafkaConsumer.releaseExchange(exchange, false);
- return currentResult;
+ return result;
}
private void logRecord(ConsumerRecord<Object, Object> record) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index fe3afd6ee8d..87f88c6e23d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,24 +17,16 @@
package org.apache.camel.component.kafka.consumer.support;
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
public final class ProcessingResult {
private static final ProcessingResult UNPROCESSED_RESULT
- = new ProcessingResult(
- false,
- AbstractCommitManager.NON_PARTITION,
- AbstractCommitManager.START_OFFSET, false);
+ = new ProcessingResult(false, false);
private final boolean breakOnErrorHit;
- private final long lastPartition;
- private final long partitionLastOffset;
private final boolean failed;
- ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) {
+ ProcessingResult(boolean breakOnErrorHit, boolean failed) {
this.breakOnErrorHit = breakOnErrorHit;
- this.lastPartition = lastPartition;
- this.partitionLastOffset = partitionLastOffset;
this.failed = failed;
}
@@ -42,14 +34,6 @@ public final class ProcessingResult {
return breakOnErrorHit;
}
- public long getPartitionLastOffset() {
- return partitionLastOffset;
- }
-
- public long getPartition() {
- return lastPartition;
- }
-
public boolean isFailed() {
return failed;
}