You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/25 12:02:24 UTC

(camel) branch main updated: CAMEL-19241: implement support for auto-commits with batching in camel-kafka (#12879)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new af004d2698c CAMEL-19241: implement support for auto-commits with batching in camel-kafka (#12879)
af004d2698c is described below

commit af004d2698cfbccb728437b9830c25c7d3634f08
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Thu Jan 25 13:02:17 2024 +0100

    CAMEL-19241: implement support for auto-commits with batching in camel-kafka (#12879)
    
    * Added tests
    * Updated documentation
    * Improved error handling
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 90 +++++++++++++++++++++-
 .../component/kafka/consumer/CommitManagers.java   |  5 ++
 .../batching/KafkaRecordBatchingProcessor.java     | 88 ++++++++++++++++++---
 .../batching/BatchingProcessingITSupport.java      |  3 +-
 ...tchingProcessingAutoCommitErrorHandlingIT.java} | 54 +++++++------
 ...va => KafkaBatchingProcessingAutoCommitIT.java} | 36 +++------
 ... => KafkaBatchingProcessingManualCommitIT.java} | 10 +--
 7 files changed, 216 insertions(+), 70 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d8d8879bb4a..b91231aa4f4 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -496,20 +496,101 @@ static {
 
 === Batching Consumer
 
-To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true` and use manual commits.
+To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true`.
+
 
 The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually
 every record or the whole batch at once by committing the last exchange on the list.
 
-When working with batch processing, it's up to the application to commit the records, and handle the outcome of potentially invalid records.
-
 The size of the batch is controlled by the option `maxPollRecords`.
 
 In order to avoid blocking for too long, waiting for the a whole set of records to fill the batch, it is is possible to use the `pollTimeoutMs` option to set a timeout for the polling. In this case, the batch may contain less messages than set in the `maxPollRecords`.
 
+==== Automatic Commits
+
+By default, Camel uses automatic commits when using batch processing. In this case, Camel automatically commits the records after they have been successfully processed by the application.
+
+In case of failures, the records will not be processed.
+
+The code below provides an example of this approach:
 [source,java]
 ----
-    from("kafka:topic?batching=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+public void configure() {
+    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
+        // The received records are stored as exchanges in a list. This gets the list of those exchanges
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
+        for (Object obj : exchanges) {
+            if (obj instanceof Exchange exchange) {
+                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
+            }
+        }
+    }).to(KafkaTestUtil.MOCK_RESULT);
+}
+----
+
+===== Handling Errors with Automatic Commits
+
+When using automatic commits, Camel will not commit records if there is a failure in processing. Because of this, there is a risk that records could be reprocessed multiple times.
+
+It is recommended to implement appropriate error handling mechanisms and patterns (i.e.; such as dead-letter queues) to prevent failed records from blocking processing progress.
+
+The code below provides an example of handling errors with automatic commits:
+
+[source,java]
+----
+public void configure() {
+    /*
+     We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
+     production scenario, applications should probably send these records to a separate topic or fix the condition
+     that lead to the failure
+     */
+    onException(IllegalArgumentException.class).process(exchange -> {
+        LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
+        LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
+    }).continued(true);
+
+    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
+        // The received records are stored as exchanges in a list. This gets the list of those exchanges
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        // The records from the batch are stored in a list of exchanges in the original exchange.
+        int i = 0;
+        for (Object o : exchanges) {
+            if (o instanceof Exchange exchange) {
+                i++;
+                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
+
+                if (i == 4) {
+                    throw new IllegalArgumentException("Failed to process record");
+                }
+            }
+        }
+    }).to(KafkaTestUtil.MOCK_RESULT);
+}
+----
+
+==== Manual Commits
+
+When working with batch processing with manual commits, it's up to the application to commit the records, and handle the outcome of potentially invalid records.
+
+The code below provides an example of this approach:
+
+[source,java]
+----
+public void configure() {
+    from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
     .process(e -> {
         // The received records are stored as exchanges in a list. This gets the list of those exchanges
         final List<?> exchanges = e.getMessage().getBody(List.class);
@@ -532,5 +613,6 @@ In order to avoid blocking for too long, waiting for the a whole set of records
             LOG.debug("Done performing manual commit");
         }
     });
+}
 ----
 include::spring-boot:partial$starter.adoc[]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
index 77791f3ba80..476435f0562 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
@@ -54,6 +54,11 @@ public final class CommitManagers {
                 LOG.debug("Using a commit-to-offset manager for commit management");
                 return new CommitToOffsetManager(consumer, kafkaConsumer, threadId, printableTopic);
             }
+
+            if (configuration.isBatching()) {
+                LOG.debug("Using an async commit manager for auto commit management with batch processing");
+                return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
+            }
         }
 
         LOG.debug("Using a NO-OP commit manager with auto-commit enabled on the Kafka consumer");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 66a3c48ea2b..082b80d7db9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.Synchronization;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -43,6 +44,44 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private final class CommitSynchronization implements Synchronization {
+        private final ExceptionHandler exceptionHandler;
+        private ProcessingResult result;
+
+        public CommitSynchronization(ExceptionHandler exceptionHandler) {
+            this.exceptionHandler = exceptionHandler;
+        }
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+            result = new ProcessingResult(false, false);
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            Exception cause = exchange.getException();
+            if (cause != null) {
+                exceptionHandler.handleException(
+                        "Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause);
+            } else {
+                LOG.warn(
+                        "Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled");
+            }
+
+            result = new ProcessingResult(false, true);
+        }
+    }
+
     public KafkaRecordBatchingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
         this.configuration = configuration;
         this.processor = processor;
@@ -58,18 +97,20 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
 
         propagateHeaders(configuration, consumerRecord, exchange);
 
-        // Batching is always in manual commit mode
-        KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
+        if (configuration.isAllowManualCommit()) {
+            KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
 
-        message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
+            message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
+        }
 
         return exchange;
     }
 
     public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
-
+        // Aggregate all consumer records in a single exchange
         List<Exchange> exchangeList = new ArrayList<>(consumerRecords.count());
 
+        // Create an inner exchange for every consumer record retrieved
         for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
             TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
             Exchange exchange = toExchange(camelKafkaConsumer, tp, consumerRecord);
@@ -77,10 +118,44 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
             exchangeList.add(exchange);
         }
 
+        // Create the bundle exchange
         final Exchange exchange = camelKafkaConsumer.createExchange(false);
         final Message message = exchange.getMessage();
         message.setBody(exchangeList);
 
+        try {
+            if (configuration.isAllowManualCommit()) {
+                return manualCommitResultProcessing(camelKafkaConsumer, exchange);
+            } else {
+                return autoCommitResultProcessing(camelKafkaConsumer, exchange);
+            }
+        } finally {
+            // Release the exchange
+            camelKafkaConsumer.releaseExchange(exchange, false);
+        }
+    }
+
+    /*
+     * The flow to execute when using auto-commit
+     */
+    private ProcessingResult autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) {
+        final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
+        final CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler);
+        exchange.getExchangeExtension().addOnCompletion(commitSynchronization);
+
+        try {
+            processor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        return commitSynchronization.result;
+    }
+
+    /*
+     * The flow to execute when the integrations perform manual commit on their own
+     */
+    private ProcessingResult manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) {
         try {
             processor.process(exchange);
         } catch (Exception e) {
@@ -90,7 +165,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
         ProcessingResult result;
         if (exchange.getException() != null) {
             LOG.debug("An exception was thrown for batch records");
-
             final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
             boolean handled = processException(exchange, exceptionHandler);
             result = new ProcessingResult(false, handled);
@@ -98,16 +172,12 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
             result = new ProcessingResult(false, false);
         }
 
-        // Release the exchange
-        camelKafkaConsumer.releaseExchange(exchange, false);
-
         return result;
     }
 
     private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) {
         // will handle/log the exception and then continue to next
         exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
-
         return true;
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
index 363efb0f1ea..f50aef3fd99 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -111,7 +112,7 @@ abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport
         final Object body = message.getBody();
         final List<?> list = assertInstanceOf(List.class, body, "The body should be a list");
 
-        //        assertEquals(expectedCount, list.size(), "The should be 5 messages on the list");
+        assertEquals(expectedCount, list.size(), "It should have received " + expectedCount + " instead of " + list.size());
 
         for (var object : list) {
             final Exchange exchange = assertInstanceOf(Exchange.class, object, "The list content should be an exchange");
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitErrorHandlingIT.java
similarity index 59%
copy from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
copy to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitErrorHandlingIT.java
index 17a1b15fa18..ece4b2e23af 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitErrorHandlingIT.java
@@ -14,14 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka.integration.batching;
 
 import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -29,11 +28,10 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingIT.class);
+public class KafkaBatchingProcessingAutoCommitErrorHandlingIT extends BatchingProcessingITSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingManualCommitIT.class);
 
-    public static final String TOPIC = "testManualCommitSyncTest";
-    private volatile boolean invalidExchange = false;
+    public static final String TOPIC = "testBatchingProcessingAutoCommitErrorHandling";
     private volatile boolean invalidExchangeFormat = false;
 
     @AfterEach
@@ -45,48 +43,54 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
     protected RouteBuilder createRouteBuilder() {
         // allowManualCommit=true&autoOffsetReset=earliest
         String from = "kafka:" + TOPIC
-                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true"
-                      + "&maxPollRecords=10&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory";
+                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest";
 
         return new RouteBuilder() {
 
             @Override
             public void configure() {
+
+                /*
+                 We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
+                 production scenario, applications should probably send these records to a separate topic or fix the condition
+                 that lead to the failure
+                 */
+                onException(IllegalArgumentException.class).process(exchange -> {
+                    LOG.warn("Failed to process batch: {}", exchange.getMessage().getBody());
+                    LOG.warn("Failed to process due to: {}",
+                            exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
+                }).continued(true);
+
                 from(from).routeId("batching").process(e -> {
                     // The received records are stored as exchanges in a list. This gets the list of those exchanges
                     final List<?> exchanges = e.getMessage().getBody(List.class);
 
                     // Ensure we are actually receiving what we are asking for
                     if (exchanges == null || exchanges.isEmpty()) {
-                        invalidExchange = true;
                         return;
                     }
 
-                    /*
-                    Every exchange in that list should contain a reference to the manual commit object. We use the reference
-                    for the last exchange in the list to commit the whole batch
-                     */
-                    final Object tmp = exchanges.get(exchanges.size() - 1);
-                    if (tmp instanceof Exchange exchange) {
-                        KafkaManualCommit manual
-                                = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
-                        LOG.debug("Performing manual commit");
-                        manual.commit();
-                        LOG.debug("Done performing manual commit");
-                    } else {
-                        invalidExchangeFormat = true;
-                    }
+                    // The records from the batch are stored in a list of exchanges in the original exchange.
+                    int i = 0;
+                    for (Object o : exchanges) {
+                        if (o instanceof Exchange exchange) {
+                            i++;
+                            LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
 
+                            if (i == 4) {
+                                throw new IllegalArgumentException("Failed to process record");
+                            }
+                        }
+                    }
                 }).to(KafkaTestUtil.MOCK_RESULT);
             }
         };
     }
 
     @Test
-    public void kafkaManualCommit() throws Exception {
+    public void kafkaAutoCommit() throws Exception {
         kafkaManualCommitTest(TOPIC);
 
         Assertions.assertFalse(invalidExchangeFormat, "The exchange list should be composed of exchanges");
     }
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitIT.java
similarity index 62%
copy from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
copy to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitIT.java
index 17a1b15fa18..dd07c235aef 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingAutoCommitIT.java
@@ -14,14 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka.integration.batching;
 
 import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -29,11 +28,10 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingIT.class);
+public class KafkaBatchingProcessingAutoCommitIT extends BatchingProcessingITSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingManualCommitIT.class);
 
-    public static final String TOPIC = "testManualCommitSyncTest";
-    private volatile boolean invalidExchange = false;
+    public static final String TOPIC = "testBatchingProcessingAutoCommit";
     private volatile boolean invalidExchangeFormat = false;
 
     @AfterEach
@@ -45,8 +43,7 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
     protected RouteBuilder createRouteBuilder() {
         // allowManualCommit=true&autoOffsetReset=earliest
         String from = "kafka:" + TOPIC
-                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true"
-                      + "&maxPollRecords=10&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory";
+                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest";
 
         return new RouteBuilder() {
 
@@ -58,35 +55,24 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
 
                     // Ensure we are actually receiving what we are asking for
                     if (exchanges == null || exchanges.isEmpty()) {
-                        invalidExchange = true;
                         return;
                     }
 
-                    /*
-                    Every exchange in that list should contain a reference to the manual commit object. We use the reference
-                    for the last exchange in the list to commit the whole batch
-                     */
-                    final Object tmp = exchanges.get(exchanges.size() - 1);
-                    if (tmp instanceof Exchange exchange) {
-                        KafkaManualCommit manual
-                                = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
-                        LOG.debug("Performing manual commit");
-                        manual.commit();
-                        LOG.debug("Done performing manual commit");
-                    } else {
-                        invalidExchangeFormat = true;
+                    // The records from the batch are stored in a list of exchanges in the original exchange.
+                    for (Object o : exchanges) {
+                        if (o instanceof Exchange exchange) {
+                            LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
+                        }
                     }
-
                 }).to(KafkaTestUtil.MOCK_RESULT);
             }
         };
     }
 
     @Test
-    public void kafkaManualCommit() throws Exception {
+    public void kafkaAutoCommit() throws Exception {
         kafkaManualCommitTest(TOPIC);
 
         Assertions.assertFalse(invalidExchangeFormat, "The exchange list should be composed of exchanges");
     }
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingManualCommitIT.java
similarity index 92%
rename from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
rename to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingManualCommitIT.java
index 17a1b15fa18..f6a803bc2dd 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingManualCommitIT.java
@@ -29,11 +29,10 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingIT.class);
+public class KafkaBatchingProcessingManualCommitIT extends BatchingProcessingITSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingManualCommitIT.class);
 
-    public static final String TOPIC = "testManualCommitSyncTest";
-    private volatile boolean invalidExchange = false;
+    public static final String TOPIC = "testBatchingProcessingManualCommit";
     private volatile boolean invalidExchangeFormat = false;
 
     @AfterEach
@@ -45,7 +44,7 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
     protected RouteBuilder createRouteBuilder() {
         // allowManualCommit=true&autoOffsetReset=earliest
         String from = "kafka:" + TOPIC
-                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true"
+                      + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true&allowManualCommit=true"
                       + "&maxPollRecords=10&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory";
 
         return new RouteBuilder() {
@@ -58,7 +57,6 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
 
                     // Ensure we are actually receiving what we are asking for
                     if (exchanges == null || exchanges.isEmpty()) {
-                        invalidExchange = true;
                         return;
                     }