You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/30 07:17:29 UTC

[camel] 03/03: CAMEL-17802: rework the commit management

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 63009a92fb5d54e78d070d4b4d18bc07e2f70de6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 29 14:36:11 2022 +0200

    CAMEL-17802: rework the commit management
    
    Adjust the commit management code to account for the fact that Kafka consumer will auto-commit on close.
    Also aligns the commit factories to match the commit management policy of their respective commit managers.
---
 .../org/apache/camel/catalog/components/kafka.json |   2 -
 .../component/kafka/KafkaEndpointUriFactory.java   | 174 ++++++++++-----------
 .../camel/component/kafka/KafkaComponent.java      |   2 +-
 .../camel/component/kafka/KafkaFetchRecords.java   |   2 -
 .../kafka/consumer/AbstractCommitManager.java      |   4 +
 .../kafka/consumer/AsyncCommitManager.java         |   4 +
 .../component/kafka/consumer/CommitManagers.java   |  37 +++--
 .../consumer/DefaultKafkaManualAsyncCommit.java    |   2 +-
 .../consumer/DefaultKafkaManualSyncCommit.java     |   4 +-
 .../kafka/consumer/NoopCommitManager.java          |   6 +-
 ...mitIT.java => BaseManualCommitTestSupport.java} |  96 ++----------
 .../KafkaConsumerManualAsyncCommitIT.java          |  66 ++++++++
 ...T.java => KafkaConsumerManualNoopCommitIT.java} |  72 +--------
 .../KafkaConsumerManualSyncCommitIT.java           |  66 ++++++++
 .../dsl/KafkaComponentBuilderFactory.java          |  20 ---
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  19 ---
 16 files changed, 277 insertions(+), 299 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 8e567ff..f3ed143 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -32,7 +32,6 @@
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then [...]
     "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consum [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." },
-    "autoCommitOnStop": { "kind": "property", "displayName": "Auto Commit On Stop", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sync", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to perform an explicit auto commit whe [...]
     "autoOffsetReset": { "kind": "property", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do when there is no initial offset [...]
     "breakOnFirstError": { "kind": "property", "displayName": "Break On First Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This options controls what happens when a consumer is processing an exchange and it fails. [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
@@ -141,7 +140,6 @@
     "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled the [...]
     "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consu [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." },
-    "autoCommitOnStop": { "kind": "parameter", "displayName": "Auto Commit On Stop", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sync", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to perform an explicit auto commit wh [...]
     "autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do when there is no initial offse [...]
     "breakOnFirstError": { "kind": "parameter", "displayName": "Break On First Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This options controls what happens when a consumer is processing an exchange and it fails [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...]
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 0b40860..6bbceda 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -22,113 +22,113 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
         Set<String> props = new HashSet<>(101);
-        props.add("synchronous");
-        props.add("queueBufferingMaxMessages");
+        props.add("additionalProperties");
         props.add("allowManualCommit");
-        props.add("consumersCount");
-        props.add("receiveBufferBytes");
-        props.add("reconnectBackoffMaxMs");
-        props.add("valueDeserializer");
-        props.add("metricReporters");
-        props.add("sslTruststoreType");
-        props.add("sendBufferBytes");
-        props.add("heartbeatIntervalMs");
-        props.add("interceptorClasses");
-        props.add("sslKeystoreType");
+        props.add("autoCommitEnable");
+        props.add("autoCommitIntervalMs");
+        props.add("autoOffsetReset");
         props.add("breakOnFirstError");
-        props.add("requestRequiredAcks");
-        props.add("enableIdempotence");
-        props.add("pollOnError");
-        props.add("fetchWaitMaxMs");
-        props.add("retries");
-        props.add("maxPollRecords");
-        props.add("additionalProperties");
-        props.add("keyDeserializer");
-        props.add("producerBatchSize");
-        props.add("retryBackoffMs");
+        props.add("bridgeErrorHandler");
         props.add("brokers");
-        props.add("metricsSampleWindowMs");
-        props.add("sslContextParameters");
-        props.add("sslKeyPassword");
-        props.add("keySerializer");
-        props.add("noOfMetricsSample");
-        props.add("maxPartitionFetchBytes");
-        props.add("partitionKey");
-        props.add("headerFilterStrategy");
-        props.add("sslTruststorePassword");
-        props.add("sessionTimeoutMs");
-        props.add("key");
-        props.add("topicIsPattern");
-        props.add("sslTruststoreLocation");
+        props.add("bufferMemorySize");
+        props.add("checkCrcs");
         props.add("clientId");
-        props.add("maxRequestSize");
-        props.add("recordMetadata");
-        props.add("sslTrustmanagerAlgorithm");
-        props.add("compressionCodec");
         props.add("commitTimeoutMs");
-        props.add("workerPoolCoreSize");
-        props.add("autoCommitEnable");
+        props.add("compressionCodec");
+        props.add("connectionMaxIdleMs");
         props.add("consumerRequestTimeoutMs");
-        props.add("maxPollIntervalMs");
-        props.add("kerberosInitCmd");
-        props.add("workerPoolMaxSize");
-        props.add("reconnectBackoffMs");
+        props.add("consumersCount");
+        props.add("deliveryTimeoutMs");
+        props.add("enableIdempotence");
+        props.add("exceptionHandler");
+        props.add("exchangePattern");
+        props.add("fetchMaxBytes");
+        props.add("fetchMinBytes");
+        props.add("fetchWaitMaxMs");
         props.add("groupId");
-        props.add("offsetRepository");
+        props.add("groupInstanceId");
+        props.add("headerDeserializer");
+        props.add("headerFilterStrategy");
+        props.add("headerSerializer");
+        props.add("heartbeatIntervalMs");
+        props.add("interceptorClasses");
+        props.add("kafkaClientFactory");
+        props.add("kafkaManualCommitFactory");
+        props.add("kerberosBeforeReloginMinTime");
+        props.add("kerberosInitCmd");
+        props.add("kerberosPrincipalToLocalRules");
         props.add("kerberosRenewJitter");
-        props.add("sslProvider");
-        props.add("saslKerberosServiceName");
-        props.add("bridgeErrorHandler");
-        props.add("shutdownTimeout");
-        props.add("saslMechanism");
-        props.add("workerPool");
-        props.add("deliveryTimeoutMs");
+        props.add("kerberosRenewWindowFactor");
+        props.add("key");
+        props.add("keyDeserializer");
+        props.add("keySerializer");
         props.add("lazyStartProducer");
-        props.add("sslKeystorePassword");
-        props.add("kafkaManualCommitFactory");
-        props.add("sslEndpointAlgorithm");
-        props.add("topic");
-        props.add("sslProtocol");
-        props.add("sslKeymanagerAlgorithm");
-        props.add("pollTimeoutMs");
-        props.add("exceptionHandler");
+        props.add("lingerMs");
         props.add("maxBlockMs");
-        props.add("kerberosBeforeReloginMinTime");
-        props.add("groupInstanceId");
-        props.add("bufferMemorySize");
+        props.add("maxInFlightRequest");
+        props.add("maxPartitionFetchBytes");
+        props.add("maxPollIntervalMs");
+        props.add("maxPollRecords");
+        props.add("maxRequestSize");
         props.add("metadataMaxAgeMs");
-        props.add("sslCipherSuites");
-        props.add("specificAvroReader");
+        props.add("metricReporters");
+        props.add("metricsSampleWindowMs");
+        props.add("noOfMetricsSample");
+        props.add("offsetRepository");
+        props.add("partitionAssignor");
+        props.add("partitionKey");
+        props.add("partitioner");
+        props.add("pollOnError");
+        props.add("pollTimeoutMs");
+        props.add("producerBatchSize");
+        props.add("queueBufferingMaxMessages");
+        props.add("receiveBufferBytes");
+        props.add("reconnectBackoffMaxMs");
+        props.add("reconnectBackoffMs");
+        props.add("recordMetadata");
+        props.add("requestRequiredAcks");
+        props.add("requestTimeoutMs");
+        props.add("retries");
+        props.add("retryBackoffMs");
         props.add("saslJaasConfig");
-        props.add("fetchMinBytes");
-        props.add("connectionMaxIdleMs");
-        props.add("lingerMs");
-        props.add("kerberosRenewWindowFactor");
+        props.add("saslKerberosServiceName");
+        props.add("saslMechanism");
+        props.add("schemaRegistryURL");
         props.add("securityProtocol");
-        props.add("autoCommitIntervalMs");
-        props.add("partitioner");
-        props.add("kerberosPrincipalToLocalRules");
-        props.add("headerSerializer");
+        props.add("seekTo");
+        props.add("sendBufferBytes");
+        props.add("sessionTimeoutMs");
+        props.add("shutdownTimeout");
+        props.add("specificAvroReader");
+        props.add("sslCipherSuites");
+        props.add("sslContextParameters");
         props.add("sslEnabledProtocols");
+        props.add("sslEndpointAlgorithm");
+        props.add("sslKeyPassword");
+        props.add("sslKeymanagerAlgorithm");
         props.add("sslKeystoreLocation");
-        props.add("schemaRegistryURL");
-        props.add("headerDeserializer");
-        props.add("maxInFlightRequest");
-        props.add("exchangePattern");
+        props.add("sslKeystorePassword");
+        props.add("sslKeystoreType");
+        props.add("sslProtocol");
+        props.add("sslProvider");
+        props.add("sslTrustmanagerAlgorithm");
+        props.add("sslTruststoreLocation");
+        props.add("sslTruststorePassword");
+        props.add("sslTruststoreType");
+        props.add("synchronous");
+        props.add("topic");
+        props.add("topicIsPattern");
+        props.add("valueDeserializer");
         props.add("valueSerializer");
-        props.add("autoOffsetReset");
-        props.add("seekTo");
-        props.add("kafkaClientFactory");
-        props.add("requestTimeoutMs");
-        props.add("fetchMaxBytes");
-        props.add("checkCrcs");
-        props.add("partitionAssignor");
+        props.add("workerPool");
+        props.add("workerPoolCoreSize");
+        props.add("workerPoolMaxSize");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         Set<String> secretProps = new HashSet<>(4);
-        secretProps.add("sslKeystorePassword");
-        secretProps.add("sslTruststorePassword");
         secretProps.add("saslJaasConfig");
         secretProps.add("sslKeyPassword");
+        secretProps.add("sslKeystorePassword");
+        secretProps.add("sslTruststorePassword");
         SECRET_PROPERTY_NAMES = Collections.unmodifiableSet(secretProps);
         Set<String> prefixes = new HashSet<>(1);
         prefixes.add("additionalProperties.");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 348b60b..310913d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -226,7 +226,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         if (kafkaClientFactory == null) {
             kafkaClientFactory = new DefaultKafkaClientFactory();
         }
-        if (kafkaManualCommitFactory == null) {
+        if (configuration.isAllowManualCommit() && kafkaManualCommitFactory == null) {
             kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
         }
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 11b1121..9c41d1b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -512,8 +512,6 @@ class KafkaFetchRecords implements Runnable {
             consumer.wakeup();
             Thread.currentThread().interrupt();
         } finally {
-            IOHelper.close(consumer);
-
             if (lock.isHeldByCurrentThread()) {
                 lock.unlock();
             }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index 7923835..8333fa1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -73,6 +73,10 @@ public abstract class AbstractCommitManager implements CommitManager {
             Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record) {
 
         KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
+        if (manualCommitFactory == null) {
+            manualCommitFactory = new DefaultKafkaManualCommitFactory();
+        }
+
         return getManualCommit(exchange, partition, record, null, manualCommitFactory);
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
index 20838c0..a364b19 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
@@ -83,6 +83,10 @@ public class AsyncCommitManager extends AbstractCommitManager {
             Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record) {
 
         KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
+        if (manualCommitFactory == null) {
+            manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();
+        }
+
         return getManualCommit(exchange, partition, record, asyncCommits, manualCommitFactory);
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
index 258e393..77791f3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
@@ -20,8 +20,12 @@ package org.apache.camel.component.kafka.consumer;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class CommitManagers {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitManagers.class);
+
     private CommitManagers() {
     }
 
@@ -29,19 +33,30 @@ public final class CommitManagers {
             Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
         KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
 
-        if (!configuration.isAllowManualCommit() && configuration.getOffsetRepository() != null) {
-            return new CommitToOffsetManager(consumer, kafkaConsumer, threadId, printableTopic);
-        }
-
-        if (configuration.isAutoCommitEnable()) {
-            return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
-        }
-
-        KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
-        if (manualCommitFactory instanceof DefaultKafkaManualAsyncCommitFactory) {
-            return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
+        if (configuration.isAllowManualCommit()) {
+            LOG.debug("Allowing manual commit management");
+            KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
+            if (manualCommitFactory instanceof DefaultKafkaManualAsyncCommitFactory) {
+                LOG.debug("Using an async commit manager for manual commit management");
+                return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
+            } else {
+                if (manualCommitFactory instanceof DefaultKafkaManualCommitFactory) {
+                    LOG.debug("Using a sync commit manager for manual commit management");
+                    return new SyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
+                } else {
+                    // This has been the default behavior for Camel
+                    LOG.debug("Using an NO-OP commit manager for manual commit management");
+                    return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
+                }
+            }
+        } else {
+            if (configuration.getOffsetRepository() != null) {
+                LOG.debug("Using a commit-to-offset manager for commit management");
+                return new CommitToOffsetManager(consumer, kafkaConsumer, threadId, printableTopic);
+            }
         }
 
+        LOG.debug("Using a NO-OP commit manager with auto-commit enabled on the Kafka consumer");
         return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic);
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index d52454c..ddc3eb1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -49,7 +49,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl
             if (offsetRepository != null) {
                 offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset));
             } else {
-                LOG.debug("CommitAsync {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
+                LOG.debug("Commit async {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
                 camelExchangePayload.consumer.commitAsync(
                         Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1)),
                         (offsets, exception) -> {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
index 2e2b0ba..1d5aed9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
@@ -44,11 +44,11 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple
             if (offsetRepository != null) {
                 offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset));
             } else {
-                LOG.debug("CommitSync {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
+                LOG.debug("Commit sync {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
                 camelExchangePayload.consumer.commitSync(
                         Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1)),
                         Duration.ofMillis(getCommitTimeout()));
-                LOG.debug("CommitSync done for {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
+                LOG.debug("Commit sync done for {} from topic {} with offset: {}", getThreadId(), getTopicName(), recordOffset);
             }
         }
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
index b03800d..05f4830 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/NoopCommitManager.java
@@ -32,21 +32,21 @@ public class NoopCommitManager extends AbstractCommitManager {
 
     @Override
     public void commit() {
-        LOG.info("Auto commit on stop {} from {} is disabled (none)", threadId, printableTopic);
+        LOG.info("Auto commit on {} from {} is enabled via Kafka consumer (NO-OP)", threadId, printableTopic);
 
     }
 
     @Override
     public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", threadId, partition.topic());
+            LOG.debug("Auto commit on stop on {} from {} is enabled via Kafka consumer (NO-OP)", threadId, partition.topic());
         }
     }
 
     @Override
     public void commitOffset(TopicPartition partition, long partitionLastOffset) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", threadId, partition.topic());
+            LOG.debug("Auto commit to offset {} from topic {} is disabled (NO-OP)", threadId, partition.topic());
         }
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
similarity index 51%
copy from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
copy to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
index e92a55e..28b8e0b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
@@ -14,116 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka.integration;
 
 import java.util.Collections;
 import java.util.Properties;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
-import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.TestInstance;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-@TestInstance(TestInstance.Lifecycle.PER_METHOD)
-public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
-
-    public static final String TOPIC = "testManualCommitTest";
 
-    @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
-                    + "&allowManualCommit=true&autoOffsetReset=earliest")
-    private Endpoint from;
+public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
 
     @EndpointInject("mock:result")
-    private MockEndpoint to;
+    protected MockEndpoint to;
 
     @EndpointInject("mock:resultBar")
-    private MockEndpoint toBar;
+    protected MockEndpoint toBar;
 
-    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+    protected org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
     @BeforeEach
-    public void before() {
+    public void createClient() {
         Properties props = getDefaultProperties();
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
     }
 
-    @AfterEach
-    public void after() {
+    public void cleanupKafka(String topic) {
         if (producer != null) {
             producer.close();
         }
         // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        kafkaAdminClient.deleteTopics(Collections.singletonList(topic));
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-
-            @Override
-            public void configure() {
-                from(from).routeId("foo").to(to).process(e -> {
-                    KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
-                    assertNotNull(manual);
-                    manual.commit();
-                });
-                from(from).routeId("bar").autoStartup(false).to(toBar);
-            }
-        };
-    }
-
-    @RepeatedTest(4)
-    public void kafkaAutoCommitDisabledDuringRebalance() throws Exception {
-        to.expectedMessageCount(1);
-        String firstMessage = "message-0";
-        to.expectedBodiesReceivedInAnyOrder(firstMessage);
-
-        ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", firstMessage);
-        producer.send(data);
-
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-
-        context.getRouteController().stopRoute("foo");
-        to.expectedMessageCount(0);
-
-        String secondMessage = "message-1";
-        data = new ProducerRecord<>(TOPIC, "1", secondMessage);
-        producer.send(data);
-
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-
-        // start a new route in order to rebalance kafka
-        context.getRouteController().startRoute("bar");
-        toBar.expectedMessageCount(1);
-
-        toBar.assertIsSatisfied();
-
-        context.getRouteController().stopRoute("bar");
-
-        // The route bar is not committing the offset, so by restarting foo, last 3 items will be processed
-        context.getRouteController().startRoute("foo");
-        to.expectedMessageCount(1);
-        to.expectedBodiesReceivedInAnyOrder("message-1");
-
-        to.assertIsSatisfied(3000);
-    }
-
-    @RepeatedTest(4)
-    public void kafkaManualCommit() throws Exception {
+    public void kafkaManualCommitTest(String topic) throws Exception {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
         // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
@@ -132,7 +59,7 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
 
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
             producer.send(data);
         }
 
@@ -147,7 +74,7 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
         // Third step: While our route is stopped, we send 3 records more to Kafka test topic
         for (int k = 5; k < 8; k++) {
             String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
             producer.send(data);
         }
 
@@ -163,5 +90,4 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
 
         to.assertIsSatisfied(3000);
     }
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualAsyncCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualAsyncCommitIT.java
new file mode 100644
index 0000000..2d96fda
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualAsyncCommitIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.TestInstance;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@TestInstance(TestInstance.Lifecycle.PER_METHOD)
+public class KafkaConsumerManualAsyncCommitIT extends BaseManualCommitTestSupport {
+
+    public static final String TOPIC = "testManualAsyncCommitTest";
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory")
+    private Endpoint from;
+
+    @AfterEach
+    public void after() {
+        cleanupKafka(TOPIC);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from).routeId("foo").to(to).process(e -> {
+                    KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                    assertNotNull(manual);
+                    manual.commit();
+                });
+                from(from).routeId("bar").autoStartup(false).to(toBar);
+            }
+        };
+    }
+
+    @RepeatedTest(1)
+    public void kafkaManualCommit() throws Exception {
+        kafkaManualCommitTest(TOPIC);
+    }
+
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualNoopCommitIT.java
similarity index 58%
rename from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
rename to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualNoopCommitIT.java
index e92a55e..96d21ff 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualNoopCommitIT.java
@@ -16,54 +16,31 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Collections;
-import java.util.Properties;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.TestInstance;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 @TestInstance(TestInstance.Lifecycle.PER_METHOD)
-public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+public class KafkaConsumerManualNoopCommitIT extends BaseManualCommitTestSupport {
 
-    public static final String TOPIC = "testManualCommitTest";
+    public static final String TOPIC = "testManualNoopCommitTest";
 
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest")
     private Endpoint from;
 
-    @EndpointInject("mock:result")
-    private MockEndpoint to;
-
-    @EndpointInject("mock:resultBar")
-    private MockEndpoint toBar;
-
-    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
-
-    @BeforeEach
-    public void before() {
-        Properties props = getDefaultProperties();
-        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-    }
-
     @AfterEach
     public void after() {
-        if (producer != null) {
-            producer.close();
-        }
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        cleanupKafka(TOPIC);
     }
 
     @Override
@@ -82,7 +59,7 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
         };
     }
 
-    @RepeatedTest(4)
+    @RepeatedTest(1)
     public void kafkaAutoCommitDisabledDuringRebalance() throws Exception {
         to.expectedMessageCount(1);
         String firstMessage = "message-0";
@@ -122,46 +99,9 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport {
         to.assertIsSatisfied(3000);
     }
 
-    @RepeatedTest(4)
+    @RepeatedTest(1)
     public void kafkaManualCommit() throws Exception {
-        to.expectedMessageCount(5);
-        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
-        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
-        // manual commit
-        to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
-
-        for (int k = 0; k < 5; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
-            producer.send(data);
-        }
-
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-
-        // Second step: We shut down our route, we expect nothing will be recovered by our route
-        context.getRouteController().stopRoute("foo");
-        to.expectedMessageCount(0);
-
-        // Third step: While our route is stopped, we send 3 records more to Kafka test topic
-        for (int k = 5; k < 8; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
-            producer.send(data);
-        }
-
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-
-        // Fourth step: We start again our route, since we have been committing the offsets from the first step,
-        // we will expect to consume from the latest committed offset e.g from offset 5
-        context.getRouteController().startRoute("foo");
-        to.expectedMessageCount(3);
-        to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
-
-        to.assertIsSatisfied(3000);
+        kafkaManualCommitTest(TOPIC);
     }
 
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualSyncCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualSyncCommitIT.java
new file mode 100644
index 0000000..2cf8ee3
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualSyncCommitIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.TestInstance;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@TestInstance(TestInstance.Lifecycle.PER_METHOD)
+public class KafkaConsumerManualSyncCommitIT extends BaseManualCommitTestSupport {
+
+    public static final String TOPIC = "testManualCommitSyncTest";
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+    private Endpoint from;
+
+    @AfterEach
+    public void after() {
+        cleanupKafka(TOPIC);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from).routeId("foo").to(to).process(e -> {
+                    KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                    assertNotNull(manual);
+                    manual.commit();
+                });
+                from(from).routeId("bar").autoStartup(false).to(toBar);
+            }
+        };
+    }
+
+    @RepeatedTest(1)
+    public void kafkaManualCommit() throws Exception {
+        kafkaManualCommitTest(TOPIC);
+    }
+
+}
diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 9f99a72..73c1fa9 100644
--- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -228,25 +228,6 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
-         * Whether to perform an explicit auto commit when the consumer stops to
-         * ensure the broker has a commit from the last consumed message. This
-         * requires the option autoCommitEnable is turned on. The possible
-         * values are: sync, async, or none. And sync is the default value.
-         * 
-         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
-         * 
-         * Default: sync
-         * Group: consumer
-         * 
-         * @param autoCommitOnStop the value to set
-         * @return the dsl builder
-         */
-        default KafkaComponentBuilder autoCommitOnStop(
-                java.lang.String autoCommitOnStop) {
-            doSetProperty("autoCommitOnStop", autoCommitOnStop);
-            return this;
-        }
-        /**
          * What to do when there is no initial offset in ZooKeeper or if an
          * offset is out of range: earliest : automatically reset the offset to
          * the earliest offset latest : automatically reset the offset to the
@@ -2076,7 +2057,6 @@ public interface KafkaComponentBuilderFactory {
             case "allowManualCommit": getOrCreateConfiguration((KafkaComponent) component).setAllowManualCommit((boolean) value); return true;
             case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent) component).setAutoCommitEnable((boolean) value); return true;
             case "autoCommitIntervalMs": getOrCreateConfiguration((KafkaComponent) component).setAutoCommitIntervalMs((java.lang.Integer) value); return true;
-            case "autoCommitOnStop": getOrCreateConfiguration((KafkaComponent) component).setAutoCommitOnStop((java.lang.String) value); return true;
             case "autoOffsetReset": getOrCreateConfiguration((KafkaComponent) component).setAutoOffsetReset((java.lang.String) value); return true;
             case "breakOnFirstError": getOrCreateConfiguration((KafkaComponent) component).setBreakOnFirstError((boolean) value); return true;
             case "bridgeErrorHandler": ((KafkaComponent) component).setBridgeErrorHandler((boolean) value); return true;
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 5d3cab9..4a4b9b6 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -350,25 +350,6 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
-         * Whether to perform an explicit auto commit when the consumer stops to
-         * ensure the broker has a commit from the last consumed message. This
-         * requires the option autoCommitEnable is turned on. The possible
-         * values are: sync, async, or none. And sync is the default value.
-         * 
-         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
-         * 
-         * Default: sync
-         * Group: consumer
-         * 
-         * @param autoCommitOnStop the value to set
-         * @return the dsl builder
-         */
-        default KafkaEndpointConsumerBuilder autoCommitOnStop(
-                String autoCommitOnStop) {
-            doSetProperty("autoCommitOnStop", autoCommitOnStop);
-            return this;
-        }
-        /**
          * What to do when there is no initial offset in ZooKeeper or if an
          * offset is out of range: earliest : automatically reset the offset to
          * the earliest offset latest : automatically reset the offset to the