You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/09 14:11:55 UTC

(camel) branch camel-3.21.x updated: remove lastResult from Kafka Camel component (#12002)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new f1cc4f9fb39 remove lastResult from Kafka Camel component (#12002)
f1cc4f9fb39 is described below

commit f1cc4f9fb3981035bfab37830600c29f8e364779
Author: Mike Barlotta <Co...@users.noreply.github.com>
AuthorDate: Tue Jan 9 09:11:48 2024 -0500

    remove lastResult from Kafka Camel component (#12002)
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 46 ++++-----------------
 .../consumer/support/KafkaRecordProcessor.java     | 48 +++++++++-------------
 .../support/KafkaRecordProcessorFacade.java        | 38 +++++++----------
 .../kafka/consumer/support/ProcessingResult.java   | 20 +--------
 4 files changed, 44 insertions(+), 108 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index e13f7d86e99..fe9b3bdf9ee 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -297,8 +297,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     protected void startPolling() {
-        long partitionLastOffset = -1;
-
+        
         try {
             /*
              * We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -307,7 +306,8 @@ public class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-
+            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+            
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
             }
@@ -315,10 +315,6 @@ public class KafkaFetchRecords implements Runnable {
             KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
-            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-
-            ProcessingResult lastResult = null;
-
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -327,43 +323,15 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
-                if (lastResult != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult on partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult of null");
-                    }
-                }
-
-                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult);
-
-                if (result != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned for partition {} and offset {}",
-                                result.getPartition(), result.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned as null");
-                    }
-                }
-
+                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
                 updateTaskState();
+                
+                // when breakOnFirstError we want to unsubscribe from Kafka 
                 if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
                     // force re-connect
                     setReconnect(true);
                     setConnected(false);
-                } else {
-                    lastResult = result;
-
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Setting lastresult to partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
                 }
 
             }
@@ -397,6 +365,8 @@ public class KafkaFetchRecords implements Runnable {
                         e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage());
             }
 
+            // why do we set this to -1
+            long partitionLastOffset = -1;
             pollExceptionStrategy.handle(partitionLastOffset, e);
         } finally {
             // only close if not retry
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index e731e4c4763..1299bdc0ba8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -86,8 +85,7 @@ public class KafkaRecordProcessor {
 
     public ProcessingResult processExchange(
             Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
-            boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
 
         Message message = exchange.getMessage();
 
@@ -114,33 +112,31 @@ public class KafkaRecordProcessor {
         } catch (Exception e) {
             exchange.setException(e);
         }
+        
+        ProcessingResult result = ProcessingResult.newUnprocessed();
         if (exchange.getException() != null) {
             LOG.debug("An exception was thrown for record at partition {} and offset {}",
                     record.partition(), record.offset());
 
-            boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult,
-                    exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
+            boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+            result = new ProcessingResult(breakOnErrorExit, true);
         } else {
-            return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null);
+            result = new ProcessingResult(false, exchange.getException() != null);
         }
+        
+        if (!result.isBreakOnErrorHit()) {
+            commitManager.recordOffset(topicPartition, record.offset());
+        }
+
+        return result;
     }
 
     private boolean processException(
             Exchange exchange, TopicPartition topicPartition,
-            ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
-            if (lastResult.getPartition() != -1 &&
-                    lastResult.getPartition() != record.partition()) {
-                LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " +
-                          " The last result was on partition {} with offset {} but was expecting partition {} with offset {}",
-                        topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(),
-                        record.partition(), record.offset());
-            }
-
             // we are failing and we should break out
             if (LOG.isWarnEnabled()) {
                 Exception exc = exchange.getException();
@@ -150,17 +146,13 @@ public class KafkaRecordProcessor {
                         record.offset(), record.partition());
             }
 
-            // force commit, so we resume on next poll where we failed 
-            // except when the failure happened at the first message in a poll
-            if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) {
-                // we should just do a commit (vs the original forceCommit)
-                // when route uses NOOP Commit Manager it will rely
-                // on the route implementation to explicitly commit offset
-                // when route uses Synch/Asynch Commit Manager it will 
-                // ALWAYS commit the offset for the failing record
-                // and will ALWAYS retry it
-                commitManager.commit(topicPartition);
-            }
+            // we should just do a commit (vs the original forceCommit)
+            // when route uses NOOP Commit Manager it will rely
+            // on the route implementation to explicitly commit offset
+            // when route uses Synch/Asynch Commit Manager it will 
+            // ALWAYS commit the offset for the failing record
+            // and will ALWAYS retry it
+            commitManager.commit(topicPartition);
 
             // continue to next partition
             return true;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 44573daa60d..b114c3ff8a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(
-            ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) {
+    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
         logRecords(allRecords);
+        
+        ProcessingResult result = ProcessingResult.newUnprocessed();
 
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
         LOG.debug("Poll received records on {} partitions", partitions.size());
 
-        ProcessingResult lastResult
-                = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
-
         while (partitionIterator.hasNext() && !isStopping()) {
             TopicPartition partition = partitionIterator.next();
 
@@ -76,34 +74,32 @@ public class KafkaRecordProcessorFacade {
 
             logRecordsInPartition(partitionRecords, partition);
 
-            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
+            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
                 ConsumerRecord<Object, Object> record = recordIterator.next();
 
                 LOG.debug("Processing record on partition {} with offset {}", record.partition(), record.offset());
 
-                lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+                result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
                         kafkaRecordProcessor, record);
 
-                LOG.debug(
-                        "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}",
-                        record.partition(), record.offset(), lastResult.getPartition(), lastResult.getPartitionLastOffset());
+                LOG.debug("Processed record on partition {} with offset {}", record.partition(), record.offset());
 
                 if (consumerListener != null) {
-                    if (!consumerListener.afterProcess(lastResult)) {
+                    if (!consumerListener.afterProcess(result)) {
                         commitManager.commit(partition);
-                        return lastResult;
+                        return result;
                     }
                 }
             }
 
-            if (!lastResult.isBreakOnErrorHit()) {
+            if (!result.isBreakOnErrorHit()) {
                 LOG.debug("Committing offset on successful execution");
                 // all records processed from partition so commit them
                 commitManager.commit(partition);
             }
         }
 
-        return lastResult;
+        return result;
     }
 
     private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
@@ -123,7 +119,6 @@ public class KafkaRecordProcessorFacade {
             TopicPartition partition,
             boolean partitionHasNext,
             boolean recordHasNext,
-            final ProcessingResult lastResult,
             KafkaRecordProcessor kafkaRecordProcessor,
             ConsumerRecord<Object, Object> record) {
 
@@ -131,18 +126,13 @@ public class KafkaRecordProcessorFacade {
 
         Exchange exchange = camelKafkaConsumer.createExchange(false);
 
-        ProcessingResult currentResult
-                = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
-                        recordHasNext, record, lastResult, camelKafkaConsumer.getExceptionHandler());
-
-        if (!currentResult.isBreakOnErrorHit()) {
-            commitManager.recordOffset(partition, currentResult.getPartitionLastOffset());
-        }
-
+        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
+                recordHasNext, record, camelKafkaConsumer.getExceptionHandler());
+        
         // success so release the exchange
         camelKafkaConsumer.releaseExchange(exchange, false);
 
-        return currentResult;
+        return result;
     }
 
     private void logRecord(ConsumerRecord<Object, Object> record) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index fe3afd6ee8d..87f88c6e23d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,24 +17,16 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
-            = new ProcessingResult(
-                    false,
-                    AbstractCommitManager.NON_PARTITION,
-                    AbstractCommitManager.START_OFFSET, false);
+            = new ProcessingResult(false, false);
 
     private final boolean breakOnErrorHit;
-    private final long lastPartition;
-    private final long partitionLastOffset;
     private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) {
+    ProcessingResult(boolean breakOnErrorHit, boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
-        this.lastPartition = lastPartition;
-        this.partitionLastOffset = partitionLastOffset;
         this.failed = failed;
     }
 
@@ -42,14 +34,6 @@ public final class ProcessingResult {
         return breakOnErrorHit;
     }
 
-    public long getPartitionLastOffset() {
-        return partitionLastOffset;
-    }
-
-    public long getPartition() {
-        return lastPartition;
-    }
-
     public boolean isFailed() {
         return failed;
     }