You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/10 09:24:04 UTC

(camel) 01/02: remove lastResult from Kafka Camel component (#12002)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cf84a59a0ead8a58a9ab4670ff37dfb942c963c1
Author: Mike Barlotta <Co...@users.noreply.github.com>
AuthorDate: Tue Jan 9 09:11:48 2024 -0500

    remove lastResult from Kafka Camel component (#12002)
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 46 ++++-----------------
 .../consumer/support/KafkaRecordProcessor.java     | 48 +++++++++-------------
 .../support/KafkaRecordProcessorFacade.java        | 37 ++++++-----------
 .../kafka/consumer/support/ProcessingResult.java   | 20 +--------
 4 files changed, 43 insertions(+), 108 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2cdd516d0c4..9a5004cc2de 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -305,8 +305,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     protected void startPolling() {
-        long partitionLastOffset = -1;
-
+        
         try {
             /*
              * We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -315,7 +314,8 @@ public class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-
+            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+            
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
             }
@@ -323,10 +323,6 @@ public class KafkaFetchRecords implements Runnable {
             KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
-            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-
-            ProcessingResult lastResult = null;
-
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -335,43 +331,15 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
-                if (lastResult != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult on partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult of null");
-                    }
-                }
-
-                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult);
-
-                if (result != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned for partition {} and offset {}",
-                                result.getPartition(), result.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned as null");
-                    }
-                }
-
+                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
                 updateTaskState();
+                
+                // when breakOnFirstError we want to unsubscribe from Kafka 
                 if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
                     // force re-connect
                     setReconnect(true);
                     setConnected(false);
-                } else {
-                    lastResult = result;
-
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Setting lastresult to partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
                 }
 
             }
@@ -405,6 +373,8 @@ public class KafkaFetchRecords implements Runnable {
                         e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage());
             }
 
+            // why do we set this to -1
+            long partitionLastOffset = -1;
             pollExceptionStrategy.handle(partitionLastOffset, e);
         } finally {
             // only close if not retry
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 2954836f5f6..b06e53493e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -86,8 +85,7 @@ public class KafkaRecordProcessor {
 
     public ProcessingResult processExchange(
             Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
-            boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
 
         Message message = exchange.getMessage();
 
@@ -114,33 +112,31 @@ public class KafkaRecordProcessor {
         } catch (Exception e) {
             exchange.setException(e);
         }
+
+        ProcessingResult result = ProcessingResult.newUnprocessed();
         if (exchange.getException() != null) {
             LOG.debug("An exception was thrown for record at partition {} and offset {}",
                     record.partition(), record.offset());
 
-            boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult,
-                    exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
+            boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+            result = new ProcessingResult(breakOnErrorExit, true);
         } else {
-            return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null);
+            result = new ProcessingResult(false, exchange.getException() != null);
+        }
+
+        if (!result.isBreakOnErrorHit()) {
+            commitManager.recordOffset(topicPartition, record.offset());
         }
+
+        return result;
     }
 
     private boolean processException(
             Exchange exchange, TopicPartition topicPartition,
-            ConsumerRecord<Object, Object> consumerRecord, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
-            if (lastResult.getPartition() != -1 &&
-                    lastResult.getPartition() != consumerRecord.partition()) {
-                LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " +
-                          " The last result was on partition {} with offset {} but was expecting partition {} with offset {}",
-                        topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(),
-                        consumerRecord.partition(), consumerRecord.offset());
-            }
-
             // we are failing and we should break out
             if (LOG.isWarnEnabled()) {
                 Exception exc = exchange.getException();
@@ -150,17 +146,13 @@ public class KafkaRecordProcessor {
                         consumerRecord.offset(), consumerRecord.partition());
             }
 
-            // force commit, so we resume on next poll where we failed
-            // except when the failure happened at the first message in a poll
-            if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) {
-                // we should just do a commit (vs the original forceCommit)
-                // when route uses NOOP Commit Manager it will rely
-                // on the route implementation to explicitly commit offset
-                // when route uses Synch/Asynch Commit Manager it will
-                // ALWAYS commit the offset for the failing record
-                // and will ALWAYS retry it
-                commitManager.commit(topicPartition);
-            }
+            // we should just do a commit (vs the original forceCommit)
+            // when route uses NOOP Commit Manager it will rely
+            // on the route implementation to explicitly commit offset
+            // when route uses Synch/Asynch Commit Manager it will
+            // ALWAYS commit the offset for the failing record
+            // and will ALWAYS retry it
+            commitManager.commit(topicPartition);
 
             // continue to next partition
             return true;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index f6317510fd9..470e2ea01e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(
-            ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) {
+    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
         logRecords(allRecords);
 
+        ProcessingResult result = ProcessingResult.newUnprocessed();
+
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
         LOG.debug("Poll received records on {} partitions", partitions.size());
 
-        ProcessingResult lastResult
-                = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
-
         while (partitionIterator.hasNext() && !isStopping()) {
             TopicPartition partition = partitionIterator.next();
 
@@ -76,36 +74,33 @@ public class KafkaRecordProcessorFacade {
 
             logRecordsInPartition(partitionRecords, partition);
 
-            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
+            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
                 ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
 
                 LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
                         consumerRecord.offset());
 
-                lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+                result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
                         kafkaRecordProcessor, consumerRecord);
 
-                LOG.debug(
-                        "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}",
-                        consumerRecord.partition(), consumerRecord.offset(), lastResult.getPartition(),
-                        lastResult.getPartitionLastOffset());
+                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(), consumerRecord.offset());
 
                 if (consumerListener != null) {
-                    if (!consumerListener.afterProcess(lastResult)) {
+                    if (!consumerListener.afterProcess(result)) {
                         commitManager.commit(partition);
-                        return lastResult;
+                        return result;
                     }
                 }
             }
 
-            if (!lastResult.isBreakOnErrorHit()) {
+            if (!result.isBreakOnErrorHit()) {
                 LOG.debug("Committing offset on successful execution");
                 // all records processed from partition so commit them
                 commitManager.commit(partition);
             }
         }
 
-        return lastResult;
+        return result;
     }
 
     private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
@@ -125,7 +120,6 @@ public class KafkaRecordProcessorFacade {
             TopicPartition partition,
             boolean partitionHasNext,
             boolean recordHasNext,
-            final ProcessingResult lastResult,
             KafkaRecordProcessor kafkaRecordProcessor,
             ConsumerRecord<Object, Object> consumerRecord) {
 
@@ -133,18 +127,13 @@ public class KafkaRecordProcessorFacade {
 
         Exchange exchange = camelKafkaConsumer.createExchange(false);
 
-        ProcessingResult currentResult
-                = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
-                        recordHasNext, consumerRecord, lastResult, camelKafkaConsumer.getExceptionHandler());
-
-        if (!currentResult.isBreakOnErrorHit()) {
-            commitManager.recordOffset(partition, currentResult.getPartitionLastOffset());
-        }
+        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
+                recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
 
         // success so release the exchange
         camelKafkaConsumer.releaseExchange(exchange, false);
 
-        return currentResult;
+        return result;
     }
 
     private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index fe3afd6ee8d..87f88c6e23d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,24 +17,16 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
-            = new ProcessingResult(
-                    false,
-                    AbstractCommitManager.NON_PARTITION,
-                    AbstractCommitManager.START_OFFSET, false);
+            = new ProcessingResult(false, false);
 
     private final boolean breakOnErrorHit;
-    private final long lastPartition;
-    private final long partitionLastOffset;
     private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) {
+    ProcessingResult(boolean breakOnErrorHit, boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
-        this.lastPartition = lastPartition;
-        this.partitionLastOffset = partitionLastOffset;
         this.failed = failed;
     }
 
@@ -42,14 +34,6 @@ public final class ProcessingResult {
         return breakOnErrorHit;
     }
 
-    public long getPartitionLastOffset() {
-        return partitionLastOffset;
-    }
-
-    public long getPartition() {
-        return lastPartition;
-    }
-
     public boolean isFailed() {
         return failed;
     }