You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/17 12:16:17 UTC

(camel) branch main updated: CAMEL-16064: preliminary refactoring of camel-kafka (#12820)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8232b52fa89 CAMEL-16064: preliminary refactoring of camel-kafka (#12820)
8232b52fa89 is described below

commit 8232b52fa8944f286283c32c97f86cefc81265b4
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Jan 17 13:16:11 2024 +0100

    CAMEL-16064: preliminary refactoring of camel-kafka (#12820)
    
    This reorganizes streaming-related processing code to simplify the implementation of the batch processing code
---
 .../camel/component/kafka/KafkaFetchRecords.java   |   3 +-
 .../AbstractKafkaRecordProcessorFacade.java        |  90 ++++++++++++
 .../consumer/support/KafkaRecordProcessor.java     | 159 +--------------------
 .../support/KafkaRecordProcessorFacade.java        | 142 ++----------------
 .../kafka/consumer/support/ProcessingResult.java   |  11 +-
 .../classic/OffsetPartitionAssignmentAdapter.java  |  11 +-
 .../KafkaRecordStreamingProcessor.java}            |  73 +++++-----
 .../KafkaRecordStreamingProcessorFacade.java}      | 108 ++++++--------
 .../kafka/KafkaIdempotentRepository.java           |   4 +-
 9 files changed, 208 insertions(+), 393 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2c32e727a50..f0e84f4e41b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -31,6 +31,7 @@ import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFac
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
 import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
 import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
+import org.apache.camel.component.kafka.consumer.support.streaming.KafkaRecordStreamingProcessorFacade;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.task.ForegroundTask;
 import org.apache.camel.support.task.Tasks;
@@ -320,7 +321,7 @@ public class KafkaFetchRecords implements Runnable {
                 LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
             }
 
-            KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
+            KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordStreamingProcessorFacade(
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java
new file mode 100644
index 00000000000..96706a6a6a2
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.util.List;
+
+import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common code for processing consumer records retrieved from Kafka
+ */
+public abstract class AbstractKafkaRecordProcessorFacade implements KafkaRecordProcessorFacade {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaRecordProcessorFacade.class);
+    protected final KafkaConsumer camelKafkaConsumer;
+    protected final String threadId;
+    protected final CommitManager commitManager;
+    protected final KafkaConsumerListener consumerListener;
+
+    protected AbstractKafkaRecordProcessorFacade(
+                                                 KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager,
+                                                 KafkaConsumerListener consumerListener) {
+        this.camelKafkaConsumer = camelKafkaConsumer;
+        this.threadId = threadId;
+        this.commitManager = commitManager;
+        this.consumerListener = consumerListener;
+    }
+
+    /**
+     * Whether the the Camel consumer is stopping
+     *
+     * @return true if is stopping or false otherwise
+     */
+    protected boolean isStopping() {
+        return camelKafkaConsumer.isStopping();
+    }
+
+    /**
+     * Utility to log record information along with partition
+     *
+     * @param partitionRecords records from partition
+     * @param partition        topic/partition information
+     */
+    protected void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
+                    partition);
+        }
+    }
+
+    /**
+     * Utility to log record information
+     *
+     * @param allRecords records retrieved from Kafka
+     */
+    protected void logRecords(ConsumerRecords<Object, Object> allRecords) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
+        }
+    }
+
+    protected void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
+                    consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
+        }
+    }
+
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index b06e53493e6..121c250b815 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -14,161 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka.consumer.support;
 
-import java.util.stream.StreamSupport;
+package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.CommitManager;
-import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
-import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
-import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaRecordProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
-
-    private final boolean autoCommitEnabled;
-    private final KafkaConfiguration configuration;
-    private final Processor processor;
-    private final CommitManager commitManager;
-
-    public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
-        this.autoCommitEnabled = configuration.isAutoCommitEnable();
-        this.configuration = configuration;
-        this.processor = processor;
-        this.commitManager = commitManager;
-    }
-
-    private void setupExchangeMessage(Message message, ConsumerRecord record) {
-        message.setHeader(KafkaConstants.PARTITION, record.partition());
-        message.setHeader(KafkaConstants.TOPIC, record.topic());
-        message.setHeader(KafkaConstants.OFFSET, record.offset());
-        message.setHeader(KafkaConstants.HEADERS, record.headers());
-        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
-        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
-
-        if (record.key() != null) {
-            message.setHeader(KafkaConstants.KEY, record.key());
-        }
-
-        LOG.debug("Setting up the exchange for message from partition {} and offset {}",
-                record.partition(), record.offset());
-
-        message.setBody(record.value());
-    }
-
-    private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
-        return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), header.value(), exchange);
-    }
-
-    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
-
-        HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
-        KafkaHeaderDeserializer headerDeserializer = configuration.getHeaderDeserializer();
-
-        StreamSupport.stream(record.headers().spliterator(), false)
-                .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy))
-                .forEach(header -> exchange.getIn().setHeader(header.key(),
-                        headerDeserializer.deserialize(header.key(), header.value())));
-    }
-
-    public ProcessingResult processExchange(
-            Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
-            boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
-
-        Message message = exchange.getMessage();
-
-        setupExchangeMessage(message, record);
-
-        propagateHeaders(record, exchange);
-
-        // if not auto commit then we have additional information on the exchange
-        if (!autoCommitEnabled) {
-            message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordHasNext);
-            message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
-        }
-
-        if (configuration.isAllowManualCommit()) {
-            // allow Camel users to access the Kafka consumer API to be able to do for example manual commits
-            KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, record);
-
-            message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-            message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
-        }
-
-        try {
-            processor.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-
-        ProcessingResult result = ProcessingResult.newUnprocessed();
-        if (exchange.getException() != null) {
-            LOG.debug("An exception was thrown for record at partition {} and offset {}",
-                    record.partition(), record.offset());
-
-            boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
-            result = new ProcessingResult(breakOnErrorExit, true);
-        } else {
-            result = new ProcessingResult(false, exchange.getException() != null);
-        }
-
-        if (!result.isBreakOnErrorHit()) {
-            commitManager.recordOffset(topicPartition, record.offset());
-        }
-
-        return result;
-    }
-
-    private boolean processException(
-            Exchange exchange, TopicPartition topicPartition,
-            ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
-
-        // processing failed due to an unhandled exception, what should we do
-        if (configuration.isBreakOnFirstError()) {
-            // we are failing and we should break out
-            if (LOG.isWarnEnabled()) {
-                Exception exc = exchange.getException();
-                LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(),
-                        exc.getMessage());
-                LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.",
-                        consumerRecord.offset(), consumerRecord.partition());
-            }
-
-            // we should just do a commit (vs the original forceCommit)
-            // when route uses NOOP Commit Manager it will rely
-            // on the route implementation to explicitly commit offset
-            // when route uses Synch/Asynch Commit Manager it will
-            // ALWAYS commit the offset for the failing record
-            // and will ALWAYS retry it
-            commitManager.commit(topicPartition);
-
-            // continue to next partition
-            return true;
-        } else {
-            // will handle/log the exception and then continue to next
-            exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
-        }
-
-        return false;
-    }
-
-    public static String serializeOffsetKey(TopicPartition topicPartition) {
-        return topicPartition.topic() + '/' + topicPartition.partition();
-    }
 
-    public static long deserializeOffsetValue(String offset) {
-        return Long.parseLong(offset);
-    }
+public interface KafkaRecordProcessor<T> {
+    ProcessingResult processExchange(
+            KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext,
+            boolean recordHasNext, T processable);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 8965a1037c4..41a747b0ef9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -17,137 +17,17 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaConsumer;
-import org.apache.camel.component.kafka.consumer.CommitManager;
-import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaRecordProcessorFacade {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
-
-    private final KafkaConsumer camelKafkaConsumer;
-    private final String threadId;
-    private final KafkaRecordProcessor kafkaRecordProcessor;
-    private final CommitManager commitManager;
-    private final KafkaConsumerListener consumerListener;
-
-    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
-                                      CommitManager commitManager, KafkaConsumerListener consumerListener) {
-        this.camelKafkaConsumer = camelKafkaConsumer;
-        this.threadId = threadId;
-        this.commitManager = commitManager;
-
-        kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
-        this.consumerListener = consumerListener;
-    }
-
-    private boolean isStopping() {
-        return camelKafkaConsumer.isStopping();
-    }
-
-    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
-        logRecords(allRecords);
-
-        ProcessingResult result = ProcessingResult.newUnprocessed();
-
-        Set<TopicPartition> partitions = allRecords.partitions();
-        Iterator<TopicPartition> partitionIterator = partitions.iterator();
-
-        LOG.debug("Poll received records on {} partitions", partitions.size());
-
-        while (partitionIterator.hasNext() && !isStopping()) {
-            TopicPartition partition = partitionIterator.next();
-
-            LOG.debug("Processing records on partition {}", partition.partition());
-
-            List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition);
-            Iterator<ConsumerRecord<Object, Object>> recordIterator = partitionRecords.iterator();
-
-            logRecordsInPartition(partitionRecords, partition);
-
-            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
-                ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
-
-                LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
-                        consumerRecord.offset());
 
-                result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
-                        kafkaRecordProcessor, consumerRecord);
-
-                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(),
-                        consumerRecord.offset());
-
-                if (consumerListener != null) {
-                    if (!consumerListener.afterProcess(result)) {
-                        commitManager.commit(partition);
-                        return result;
-                    }
-                }
-            }
-
-            if (!result.isBreakOnErrorHit()) {
-                LOG.debug("Committing offset on successful execution");
-                // all records processed from partition so commit them
-                commitManager.commit(partition);
-            }
-        }
-
-        return result;
-    }
-
-    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
-                    partition);
-        }
-    }
-
-    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
-        }
-    }
-
-    private ProcessingResult processRecord(
-            TopicPartition partition,
-            boolean partitionHasNext,
-            boolean recordHasNext,
-            KafkaRecordProcessor kafkaRecordProcessor,
-            ConsumerRecord<Object, Object> consumerRecord) {
-
-        logRecord(consumerRecord);
-
-        Exchange exchange = camelKafkaConsumer.createExchange(false);
-
-        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
-                recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
-
-        // success so release the exchange
-        camelKafkaConsumer.releaseExchange(exchange, false);
-
-        return result;
-    }
-
-    private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
-                    consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
-        }
-    }
-
-    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
-        return new KafkaRecordProcessor(
-                camelKafkaConsumer.getEndpoint().getConfiguration(),
-                camelKafkaConsumer.getProcessor(),
-                commitManager);
-    }
+/**
+ * A processing facade that allows processing consumer records in different ways
+ */
+public interface KafkaRecordProcessorFacade {
+    /**
+     * Sends a set of records polled from Kafka for processing
+     *
+     * @param  allRecords All records received from a call to the Kafka's consumer poll method
+     * @return            The result of processing this set of records
+     */
+    ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 1728616cf9e..4be6b8ba7eb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
+/**
+ * Holds the result of processing one or more consumer records
+ */
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
             = new ProcessingResult(false, false);
@@ -24,7 +27,13 @@ public final class ProcessingResult {
     private final boolean breakOnErrorHit;
     private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, boolean failed) {
+    /**
+     * Constructs a new processing result
+     *
+     * @param breakOnErrorHit break on error hit setting
+     * @param failed          whether processing has failed
+     */
+    public ProcessingResult(boolean breakOnErrorHit, boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
         this.failed = failed;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
index 4227ad7f004..a851659b815 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
@@ -24,9 +24,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
-
 /**
  * A resume strategy that uses Kafka's offset for resuming
  */
@@ -62,4 +59,12 @@ public class OffsetPartitionAssignmentAdapter implements PartitionAssignmentAdap
             }
         }
     }
+
+    public static String serializeOffsetKey(TopicPartition topicPartition) {
+        return topicPartition.topic() + '/' + topicPartition.partition();
+    }
+
+    public static long deserializeOffsetValue(String offset) {
+        return Long.parseLong(offset);
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
similarity index 71%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
index b06e53493e6..2cf0ca267a3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.streaming;
 
 import java.util.stream.StreamSupport;
 
@@ -23,8 +23,11 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -34,64 +37,67 @@ import org.apache.kafka.common.header.Header;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaRecordProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
+final class KafkaRecordStreamingProcessor implements KafkaRecordProcessor<ConsumerRecord<Object, Object>> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessor.class);
 
     private final boolean autoCommitEnabled;
     private final KafkaConfiguration configuration;
     private final Processor processor;
     private final CommitManager commitManager;
 
-    public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
+    public KafkaRecordStreamingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
         this.autoCommitEnabled = configuration.isAutoCommitEnable();
         this.configuration = configuration;
         this.processor = processor;
         this.commitManager = commitManager;
     }
 
-    private void setupExchangeMessage(Message message, ConsumerRecord record) {
-        message.setHeader(KafkaConstants.PARTITION, record.partition());
-        message.setHeader(KafkaConstants.TOPIC, record.topic());
-        message.setHeader(KafkaConstants.OFFSET, record.offset());
-        message.setHeader(KafkaConstants.HEADERS, record.headers());
-        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
-        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
+    private void setupExchangeMessage(Message message, ConsumerRecord<?, ?> consumerRecord) {
+        message.setHeader(KafkaConstants.PARTITION, consumerRecord.partition());
+        message.setHeader(KafkaConstants.TOPIC, consumerRecord.topic());
+        message.setHeader(KafkaConstants.OFFSET, consumerRecord.offset());
+        message.setHeader(KafkaConstants.HEADERS, consumerRecord.headers());
+        message.setHeader(KafkaConstants.TIMESTAMP, consumerRecord.timestamp());
+        message.setHeader(Exchange.MESSAGE_TIMESTAMP, consumerRecord.timestamp());
 
-        if (record.key() != null) {
-            message.setHeader(KafkaConstants.KEY, record.key());
+        if (consumerRecord.key() != null) {
+            message.setHeader(KafkaConstants.KEY, consumerRecord.key());
         }
 
         LOG.debug("Setting up the exchange for message from partition {} and offset {}",
-                record.partition(), record.offset());
+                consumerRecord.partition(), consumerRecord.offset());
 
-        message.setBody(record.value());
+        message.setBody(consumerRecord.value());
     }
 
     private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
         return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), header.value(), exchange);
     }
 
-    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
+    private void propagateHeaders(ConsumerRecord<Object, Object> consumerRecord, Exchange exchange) {
 
         HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
         KafkaHeaderDeserializer headerDeserializer = configuration.getHeaderDeserializer();
 
-        StreamSupport.stream(record.headers().spliterator(), false)
+        StreamSupport.stream(consumerRecord.headers().spliterator(), false)
                 .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy))
                 .forEach(header -> exchange.getIn().setHeader(header.key(),
                         headerDeserializer.deserialize(header.key(), header.value())));
     }
 
+    @Override
     public ProcessingResult processExchange(
-            Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
-            boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
+            KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext,
+            boolean recordHasNext, ConsumerRecord<Object, Object> consumerRecord) {
+
+        final Exchange exchange = camelKafkaConsumer.createExchange(false);
+        final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
 
         Message message = exchange.getMessage();
 
-        setupExchangeMessage(message, record);
+        setupExchangeMessage(message, consumerRecord);
 
-        propagateHeaders(record, exchange);
+        propagateHeaders(consumerRecord, exchange);
 
         // if not auto commit then we have additional information on the exchange
         if (!autoCommitEnabled) {
@@ -101,7 +107,7 @@ public class KafkaRecordProcessor {
 
         if (configuration.isAllowManualCommit()) {
             // allow Camel users to access the Kafka consumer API to be able to do for example manual commits
-            KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, record);
+            KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
 
             message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
             message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
@@ -113,21 +119,24 @@ public class KafkaRecordProcessor {
             exchange.setException(e);
         }
 
-        ProcessingResult result = ProcessingResult.newUnprocessed();
+        ProcessingResult result;
         if (exchange.getException() != null) {
-            LOG.debug("An exception was thrown for record at partition {} and offset {}",
-                    record.partition(), record.offset());
+            LOG.debug("An exception was thrown for consumerRecord at partition {} and offset {}",
+                    consumerRecord.partition(), consumerRecord.offset());
 
-            boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+            boolean breakOnErrorExit = processException(exchange, topicPartition, consumerRecord, exceptionHandler);
             result = new ProcessingResult(breakOnErrorExit, true);
         } else {
             result = new ProcessingResult(false, exchange.getException() != null);
         }
 
         if (!result.isBreakOnErrorHit()) {
-            commitManager.recordOffset(topicPartition, record.offset());
+            commitManager.recordOffset(topicPartition, consumerRecord.offset());
         }
 
+        // success so release the exchange
+        camelKafkaConsumer.releaseExchange(exchange, false);
+
         return result;
     }
 
@@ -163,12 +172,4 @@ public class KafkaRecordProcessor {
 
         return false;
     }
-
-    public static String serializeOffsetKey(TopicPartition topicPartition) {
-        return topicPartition.topic() + '/' + topicPartition.partition();
-    }
-
-    public static long deserializeOffsetValue(String offset) {
-        return Long.parseLong(offset);
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
similarity index 59%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
index 8965a1037c4..0e6aa6ace08 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
@@ -15,45 +15,65 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.streaming;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.consumer.support.AbstractKafkaRecordProcessorFacade;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaRecordProcessorFacade {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
+public class KafkaRecordStreamingProcessorFacade extends AbstractKafkaRecordProcessorFacade {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessorFacade.class);
+    private final KafkaRecordStreamingProcessor kafkaRecordProcessor;
 
-    private final KafkaConsumer camelKafkaConsumer;
-    private final String threadId;
-    private final KafkaRecordProcessor kafkaRecordProcessor;
-    private final CommitManager commitManager;
-    private final KafkaConsumerListener consumerListener;
-
-    public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
-                                      CommitManager commitManager, KafkaConsumerListener consumerListener) {
-        this.camelKafkaConsumer = camelKafkaConsumer;
-        this.threadId = threadId;
-        this.commitManager = commitManager;
+    public KafkaRecordStreamingProcessorFacade(
+                                               KafkaConsumer camelKafkaConsumer, String threadId,
+                                               CommitManager commitManager, KafkaConsumerListener consumerListener) {
+        super(camelKafkaConsumer, threadId, commitManager, consumerListener);
 
         kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
-        this.consumerListener = consumerListener;
+
     }
 
-    private boolean isStopping() {
-        return camelKafkaConsumer.isStopping();
+    private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
+        return new KafkaRecordStreamingProcessor(
+                camelKafkaConsumer.getEndpoint().getConfiguration(),
+                camelKafkaConsumer.getProcessor(),
+                commitManager);
+    }
+
+    /**
+     * Process a single record retrieved from Kafka
+     *
+     * @param  partition            the partition
+     * @param  partitionHasNext     whether there are more partitions to process
+     * @param  recordHasNext        whether more records to be processed exist in that partition
+     * @param  kafkaRecordProcessor the record processor
+     * @param  consumerRecord       the consumer record retrieved from Kafka to process
+     * @return                      The result of processing this set of records
+     */
+    private ProcessingResult processRecord(
+            TopicPartition partition, boolean partitionHasNext, boolean recordHasNext,
+            KafkaRecordStreamingProcessor kafkaRecordProcessor,
+            ConsumerRecord<Object, Object> consumerRecord) {
+
+        logRecord(consumerRecord);
+
+        return kafkaRecordProcessor.processExchange(camelKafkaConsumer, partition, partitionHasNext,
+                recordHasNext, consumerRecord);
     }
 
+    @Override
     public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
         logRecords(allRecords);
 
@@ -77,13 +97,15 @@ public class KafkaRecordProcessorFacade {
             while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
                 ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
 
-                LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
+                LOG.debug("Processing record on partition {} with offset {}",
+                        consumerRecord.partition(),
                         consumerRecord.offset());
 
                 result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
                         kafkaRecordProcessor, consumerRecord);
 
-                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(),
+                LOG.debug("Processed record on partition {} with offset {}",
+                        consumerRecord.partition(),
                         consumerRecord.offset());
 
                 if (consumerListener != null) {
@@ -104,50 +126,4 @@ public class KafkaRecordProcessorFacade {
         return result;
     }
 
-    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
-                    partition);
-        }
-    }
-
-    private void logRecords(ConsumerRecords<Object, Object> allRecords) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
-        }
-    }
-
-    private ProcessingResult processRecord(
-            TopicPartition partition,
-            boolean partitionHasNext,
-            boolean recordHasNext,
-            KafkaRecordProcessor kafkaRecordProcessor,
-            ConsumerRecord<Object, Object> consumerRecord) {
-
-        logRecord(consumerRecord);
-
-        Exchange exchange = camelKafkaConsumer.createExchange(false);
-
-        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
-                recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
-
-        // success so release the exchange
-        camelKafkaConsumer.releaseExchange(exchange, false);
-
-        return result;
-    }
-
-    private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
-                    consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
-        }
-    }
-
-    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
-        return new KafkaRecordProcessor(
-                camelKafkaConsumer.getEndpoint().getConfiguration(),
-                camelKafkaConsumer.getProcessor(),
-                commitManager);
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index f3dc6f0eaeb..226fb1209b4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -306,8 +306,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     /**
      * Sets the group id of the Kafka consumer.
      *
-     * @param groupId The poll duration in milliseconds.
-     * @deprecated The parameter groupId is ignored.
+     * @param      groupId The poll duration in milliseconds.
+     * @deprecated         The parameter groupId is ignored.
      */
     @Deprecated
     public void setGroupId(String groupId) {