You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/03/10 12:45:33 UTC

[camel] branch master updated: CAMEL-15924: Add Manual Kafka offset capability to vertx kafka (#5194)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f5f3c  CAMEL-15924: Add Manual Kafka offset capability to vertx kafka (#5194)
24f5f3c is described below

commit 24f5f3c0f08e955f77a6307293ea499088d6eeeb
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Wed Mar 10 13:45:14 2021 +0100

    CAMEL-15924: Add Manual Kafka offset capability to vertx kafka (#5194)
    
    * CAMEL-15924: Add initial Manual Kafka offset
    
    * CAMEL-15924: Add Manual Kafka offset capability to vertx kafka
---
 .../camel/catalog/docs/vertx-kafka-component.adoc  |  39 ++++++-
 .../vertx/kafka/VertxKafkaComponentConfigurer.java |  14 ++-
 .../vertx/kafka/VertxKafkaEndpointConfigurer.java  |   6 +
 .../vertx/kafka/VertxKafkaEndpointUriFactory.java  |   3 +-
 .../camel/component/vertx/kafka/vertx-kafka.json   |   3 +
 .../src/main/docs/vertx-kafka-component.adoc       |  39 ++++++-
 .../component/vertx/kafka/VertxKafkaComponent.java |  17 +++
 .../component/vertx/kafka/VertxKafkaConstants.java |   1 +
 .../component/vertx/kafka/VertxKafkaConsumer.java  |  14 +++
 .../configuration/BaseVertxKafkaConfiguration.java |  22 ++++
 .../offset/DefaultVertxKafkaManualCommit.java      |  71 ++++++++++++
 .../DefaultVertxKafkaManualCommitFactory.java      |  30 +++++
 .../vertx/kafka/offset/VertxKafkaManualCommit.java |  28 +++++
 .../offset/VertxKafkaManualCommitFactory.java      |  34 ++++++
 .../vertx/kafka/MockConsumerInterceptor.java       |  35 ++++++
 .../kafka/VertxKafkaConsumerManualCommitTest.java  | 123 +++++++++++++++++++++
 .../dsl/VertxKafkaComponentBuilderFactory.java     |  48 ++++++++
 .../dsl/VertxKafkaEndpointBuilderFactory.java      |  49 ++++++++
 .../modules/ROOT/pages/vertx-kafka-component.adoc  |  39 ++++++-
 19 files changed, 607 insertions(+), 8 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc
index c504119..6686a6b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/vertx-kafka-component.adoc
@@ -74,7 +74,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (101 parameters):
+=== Query Parameters (102 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -102,6 +102,7 @@ with the following path and query parameters:
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -186,7 +187,7 @@ with the following path and query parameters:
 
 == Component Options
 // component options: START
-The Vert.x Kafka component supports 104 options, which are listed below.
+The Vert.x Kafka component supports 106 options, which are listed below.
 
 
 
@@ -216,6 +217,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -240,6 +242,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *seekToPosition* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeou [...]
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaManualCommitFactory* (consumer) | *Autowired* Factory to use for creating org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances. This allows to plugin a custom factory to create custom org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | VertxKafkaManualCommitFactory
 | *acks* (producer) | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, a [...]
 | *batchSize* (producer) | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batchi [...]
 | *bufferMemory* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compres [...]
@@ -404,6 +407,38 @@ from("vertx-kafka:test_topic?bootstrapServers=kafka9092")
 By default all headers are being filtered by `VertxKafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes.
 
+
+== Using manual commit with Kafka consumer
+
+By default the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.
+
+In case you want to force manual commits, you can use `VertxKafkaManualCommit` API from the Camel Exchange, stored on the message header.
+This requires to turn on manual commits by either setting the option `allowManualCommit` to `true` on the `VertxKafkaComponent`
+or on the endpoint, for example:
+
+[source,java]
+----
+VertxKafkaComponent kafka = new VertxKafkaComponent();
+kafka.setAllowManualCommit(true);
+...
+camelContext.addComponent("vertx-kafka", kafka);
+----
+
+You can then use the `VertxKafkaManualCommit` from Java code such as a Camel `Processor`:
+[source,java]
+----
+public void process(Exchange exchange) {
+    VertxKafkaManualCommit manual =
+        exchange.getIn().getHeader(VertxKafkaConstants.MANUAL_COMMIT, VertxKafkaManualCommit.class);
+    manual.commit();
+}
+----
+
+This will force a asynchronous commit to Kafka.
+
+If you want to use a custom implementation of `VertxKafkaManualCommit` then you can configure a custom `VertxKafkaManualCommitFactory`
+on the `VertxKafkaComponent` that creates instances of your custom implementation.
+
 === Consumer Example
 Here is the minimal route you need in order to read messages from Kafka.
 
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java
index 100966b..e6f9cf4 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponentConfigurer.java
@@ -33,6 +33,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "additionalProperties": getOrCreateConfiguration(target).setAdditionalProperties(property(camelContext, java.util.Map.class, value)); return true;
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": getOrCreateConfiguration(target).setAllowAutoCreateTopics(property(camelContext, boolean.class, value)); return true;
+        case "allowmanualcommit":
+        case "allowManualCommit": getOrCreateConfiguration(target).setAllowManualCommit(property(camelContext, boolean.class, value)); return true;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": getOrCreateConfiguration(target).setAutoCommitIntervalMs(property(camelContext, int.class, value)); return true;
         case "autooffsetreset":
@@ -88,6 +90,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "interceptorClasses": getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
         case "isolationlevel":
         case "isolationLevel": getOrCreateConfiguration(target).setIsolationLevel(property(camelContext, java.lang.String.class, value)); return true;
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory.class, value)); return true;
         case "keydeserializer":
         case "keyDeserializer": getOrCreateConfiguration(target).setKeyDeserializer(property(camelContext, java.lang.String.class, value)); return true;
         case "keyserializer":
@@ -238,7 +242,7 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
 
     @Override
     public String[] getAutowiredNames() {
-        return new String[]{"vertx","vertxKafkaClientFactory"};
+        return new String[]{"kafkaManualCommitFactory","vertx","vertxKafkaClientFactory"};
     }
 
     @Override
@@ -249,6 +253,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "additionalProperties": return java.util.Map.class;
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": return boolean.class;
+        case "allowmanualcommit":
+        case "allowManualCommit": return boolean.class;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return int.class;
         case "autooffsetreset":
@@ -304,6 +310,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "interceptorClasses": return java.lang.String.class;
         case "isolationlevel":
         case "isolationLevel": return java.lang.String.class;
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": return org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory.class;
         case "keydeserializer":
         case "keyDeserializer": return java.lang.String.class;
         case "keyserializer":
@@ -461,6 +469,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "additionalProperties": return getOrCreateConfiguration(target).getAdditionalProperties();
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": return getOrCreateConfiguration(target).isAllowAutoCreateTopics();
+        case "allowmanualcommit":
+        case "allowManualCommit": return getOrCreateConfiguration(target).isAllowManualCommit();
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return getOrCreateConfiguration(target).getAutoCommitIntervalMs();
         case "autooffsetreset":
@@ -516,6 +526,8 @@ public class VertxKafkaComponentConfigurer extends PropertyConfigurerSupport imp
         case "interceptorClasses": return getOrCreateConfiguration(target).getInterceptorClasses();
         case "isolationlevel":
         case "isolationLevel": return getOrCreateConfiguration(target).getIsolationLevel();
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": return target.getKafkaManualCommitFactory();
         case "keydeserializer":
         case "keyDeserializer": return getOrCreateConfiguration(target).getKeyDeserializer();
         case "keyserializer":
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java
index 3b9e546..42c8dac 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointConfigurer.java
@@ -26,6 +26,8 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl
         case "additionalProperties": target.getConfiguration().setAdditionalProperties(property(camelContext, java.util.Map.class, value)); return true;
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": target.getConfiguration().setAllowAutoCreateTopics(property(camelContext, boolean.class, value)); return true;
+        case "allowmanualcommit":
+        case "allowManualCommit": target.getConfiguration().setAllowManualCommit(property(camelContext, boolean.class, value)); return true;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": target.getConfiguration().setAutoCommitIntervalMs(property(camelContext, int.class, value)); return true;
         case "autooffsetreset":
@@ -233,6 +235,8 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl
         case "additionalProperties": return java.util.Map.class;
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": return boolean.class;
+        case "allowmanualcommit":
+        case "allowManualCommit": return boolean.class;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return int.class;
         case "autooffsetreset":
@@ -441,6 +445,8 @@ public class VertxKafkaEndpointConfigurer extends PropertyConfigurerSupport impl
         case "additionalProperties": return target.getConfiguration().getAdditionalProperties();
         case "allowautocreatetopics":
         case "allowAutoCreateTopics": return target.getConfiguration().isAllowAutoCreateTopics();
+        case "allowmanualcommit":
+        case "allowManualCommit": return target.getConfiguration().isAllowManualCommit();
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return target.getConfiguration().getAutoCommitIntervalMs();
         case "autooffsetreset":
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java
index ebf346d..9916467 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/java/org/apache/camel/component/vertx/kafka/VertxKafkaEndpointUriFactory.java
@@ -20,7 +20,8 @@ public class VertxKafkaEndpointUriFactory extends org.apache.camel.support.compo
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(102);
+        Set<String> props = new HashSet<>(103);
+        props.add("allowManualCommit");
         props.add("receiveBufferBytes");
         props.add("saslLoginRefreshWindowFactor");
         props.add("socketConnectionSetupTimeoutMs");
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json
index 26685d8..95dd3b6 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/generated/resources/org/apache/camel/component/vertx/kafka/vertx-kafka.json
@@ -45,6 +45,7 @@
     "socketConnectionSetupTimeoutMaxMs": { "kind": "property", "displayName": "Socket Connection Setup Timeout Max Ms", "group": "common", "label": "common", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "2m7s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the clien [...]
     "socketConnectionSetupTimeoutMs": { "kind": "property", "displayName": "Socket Connection Setup Timeout Ms", "group": "common", "label": "common", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the client will wait for  [...]
     "allowAutoCreateTopics": { "kind": "property", "displayName": "Allow Auto Create Topics", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Allow automatic topic creation on the broker when subscrib [...]
+    "allowManualCommit": { "kind": "property", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via org.apache.camel.compone [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The frequency in milliseconds that the consumer offsets are aut [...]
     "autoOffsetReset": { "kind": "property", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "What to do when t [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
@@ -69,6 +70,7 @@
     "seekToPosition": { "kind": "property", "displayName": "Seek To Position", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect client failures when using Kafka's group man [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Deseri [...]
+    "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances. This allows to plugin [...]
     "acks": { "kind": "property", "displayName": "Acks", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all", "-1", "0", "1" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The number of acknowledgments the producer requires t [...]
     "batchSize": { "kind": "property", "displayName": "Batch Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 16384, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The producer will attempt to batch records together into fewer requests whenever multip [...]
     "bufferMemory": { "kind": "property", "displayName": "Buffer Memory", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 33554432, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...]
@@ -151,6 +153,7 @@
     "socketConnectionSetupTimeoutMaxMs": { "kind": "parameter", "displayName": "Socket Connection Setup Timeout Max Ms", "group": "common", "label": "common", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "2m7s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The maximum amount of time the clie [...]
     "socketConnectionSetupTimeoutMs": { "kind": "parameter", "displayName": "Socket Connection Setup Timeout Ms", "group": "common", "label": "common", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The amount of time the client will wait for [...]
     "allowAutoCreateTopics": { "kind": "parameter", "displayName": "Allow Auto Create Topics", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Allow automatic topic creation on the broker when subscri [...]
+    "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via org.apache.camel.compon [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5s", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "The frequency in milliseconds that the consumer offsets are au [...]
     "autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration", "configurationField": "configuration", "description": "What to do when  [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...]
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
index c504119..6686a6b 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
@@ -74,7 +74,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (101 parameters):
+=== Query Parameters (102 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -102,6 +102,7 @@ with the following path and query parameters:
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -186,7 +187,7 @@ with the following path and query parameters:
 
 == Component Options
 // component options: START
-The Vert.x Kafka component supports 104 options, which are listed below.
+The Vert.x Kafka component supports 106 options, which are listed below.
 
 
 
@@ -216,6 +217,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -240,6 +242,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *seekToPosition* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeou [...]
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaManualCommitFactory* (consumer) | *Autowired* Factory to use for creating org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances. This allows to plugin a custom factory to create custom org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | VertxKafkaManualCommitFactory
 | *acks* (producer) | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, a [...]
 | *batchSize* (producer) | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batchi [...]
 | *bufferMemory* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compres [...]
@@ -404,6 +407,38 @@ from("vertx-kafka:test_topic?bootstrapServers=kafka9092")
 By default all headers are being filtered by `VertxKafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes.
 
+
+== Using manual commit with Kafka consumer
+
+By default the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.
+
+In case you want to force manual commits, you can use `VertxKafkaManualCommit` API from the Camel Exchange, stored on the message header.
+This requires to turn on manual commits by either setting the option `allowManualCommit` to `true` on the `VertxKafkaComponent`
+or on the endpoint, for example:
+
+[source,java]
+----
+VertxKafkaComponent kafka = new VertxKafkaComponent();
+kafka.setAllowManualCommit(true);
+...
+camelContext.addComponent("vertx-kafka", kafka);
+----
+
+You can then use the `VertxKafkaManualCommit` from Java code such as a Camel `Processor`:
+[source,java]
+----
+public void process(Exchange exchange) {
+    VertxKafkaManualCommit manual =
+        exchange.getIn().getHeader(VertxKafkaConstants.MANUAL_COMMIT, VertxKafkaManualCommit.class);
+    manual.commit();
+}
+----
+
+This will force a asynchronous commit to Kafka.
+
+If you want to use a custom implementation of `VertxKafkaManualCommit` then you can configure a custom `VertxKafkaManualCommitFactory`
+on the `VertxKafkaComponent` that creates instances of your custom implementation.
+
 === Consumer Example
 Here is the minimal route you need in order to read messages from Kafka.
 
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java
index 1616300..789929d 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaComponent.java
@@ -23,6 +23,8 @@ import io.vertx.core.VertxOptions;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
+import org.apache.camel.component.vertx.kafka.offset.DefaultVertxKafkaManualCommitFactory;
+import org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
@@ -42,6 +44,8 @@ public class VertxKafkaComponent extends DefaultComponent {
     private VertxOptions vertxOptions;
     @Metadata(label = "advanced", autowired = true)
     private VertxKafkaClientFactory vertxKafkaClientFactory = new DefaultVertxKafkaClientFactory();
+    @Metadata(label = "consumer,advanced", autowired = true)
+    private VertxKafkaManualCommitFactory kafkaManualCommitFactory = new DefaultVertxKafkaManualCommitFactory();
 
     public VertxKafkaComponent() {
     }
@@ -146,4 +150,17 @@ public class VertxKafkaComponent extends DefaultComponent {
         this.vertxKafkaClientFactory = vertxKafkaClientFactory;
     }
 
+    /**
+     * Factory to use for creating {@link org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit}
+     * instances. This allows to plugin a custom factory to create custom
+     * {@link org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit} instances in case special logic is
+     * needed when doing manual commits that deviates from the default implementation that comes out of the box.
+     */
+    public VertxKafkaManualCommitFactory getKafkaManualCommitFactory() {
+        return kafkaManualCommitFactory;
+    }
+
+    public void setKafkaManualCommitFactory(VertxKafkaManualCommitFactory kafkaManualCommitFactory) {
+        this.kafkaManualCommitFactory = kafkaManualCommitFactory;
+    }
 }
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
index 03d367c..b822097 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
@@ -28,6 +28,7 @@ public final class VertxKafkaConstants {
     public static final String OFFSET = HEADER_PREFIX + "Offset";
     public static final String HEADERS = HEADER_PREFIX + "Headers";
     public static final String TIMESTAMP = HEADER_PREFIX + "Timestamp";
+    public static final String MANUAL_COMMIT = HEADER_PREFIX + "ManualCommit";
     // headers evaluated by the producer only
     public static final String OVERRIDE_TOPIC = HEADER_PREFIX + "OverrideTopic";
 
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
index 3327b1a..b1b79aa 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.vertx.kafka;
 import java.util.Map;
 
 import io.vertx.core.buffer.Buffer;
+import io.vertx.kafka.client.common.TopicPartition;
 import io.vertx.kafka.client.consumer.KafkaConsumer;
 import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
 import org.apache.camel.Exchange;
@@ -27,6 +28,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
 import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
+import org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit;
 import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
@@ -151,9 +153,21 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
         message.setHeader(VertxKafkaConstants.TIMESTAMP, record.timestamp());
         message.setHeader(VertxKafkaConstants.MESSAGE_KEY, record.key());
 
+        // set headers for the manual offsets commit
+        if (getConfiguration().isAllowManualCommit()) {
+            message.setHeader(VertxKafkaConstants.MANUAL_COMMIT, createKafkaManualCommit(kafkaConsumer, record.topic(),
+                    new TopicPartition(record.topic(), record.partition()), record.offset()));
+        }
+
         return exchange;
     }
 
+    private VertxKafkaManualCommit createKafkaManualCommit(
+            final KafkaConsumer<Object, Object> consumer, final String topicName,
+            final TopicPartition partition, final long offset) {
+        return getEndpoint().getComponent().getKafkaManualCommitFactory().create(consumer, topicName, partition, offset);
+    }
+
     private class ConsumerOnCompletion extends SynchronizationAdapter {
 
         @Override
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java
index 90ca519..2ad5bfa 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/configuration/BaseVertxKafkaConfiguration.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.vertx.kafka.configuration;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.component.vertx.kafka.VertxKafkaHeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -25,6 +26,8 @@ public abstract class BaseVertxKafkaConfiguration implements HeaderFilterStrateg
 
     @UriParam(label = "common")
     private HeaderFilterStrategy headerFilterStrategy = new VertxKafkaHeaderFilterStrategy();
+    @UriParam(label = "consumer")
+    private boolean allowManualCommit;
 
     /**
      * To use a custom HeaderFilterStrategy to filter header to and from Camel message.
@@ -37,4 +40,23 @@ public abstract class BaseVertxKafkaConfiguration implements HeaderFilterStrateg
         this.headerFilterStrategy = headerFilterStrategy;
     }
 
+    /**
+     * Whether to allow doing manual commits via
+     * {@link org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit}.
+     * <p/>
+     * If this option is enabled then an instance of
+     * {@link org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit} is stored on the {@link Exchange}
+     * message header, which allows end users to access this API and perform manual offset commits via the Kafka
+     * consumer.
+     *
+     * Note: To take full control of the offset committing, you may need to disable the Kafka Consumer default auto
+     * commit behavior by setting 'enableAutoCommit' to 'false'.
+     */
+    public boolean isAllowManualCommit() {
+        return allowManualCommit;
+    }
+
+    public void setAllowManualCommit(boolean allowManualCommit) {
+        this.allowManualCommit = allowManualCommit;
+    }
 }
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommit.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommit.java
new file mode 100644
index 0000000..d3fd533
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka.offset;
+
+import java.util.Collections;
+
+import io.vertx.kafka.client.common.TopicPartition;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
+import io.vertx.kafka.client.consumer.OffsetAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultVertxKafkaManualCommit implements VertxKafkaManualCommit {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultVertxKafkaManualCommit.class);
+
+    private final KafkaConsumer<Object, Object> kafkaConsumer;
+    private final String topicName;
+    private final TopicPartition partition;
+    private final long recordOffset;
+
+    public DefaultVertxKafkaManualCommit(KafkaConsumer<Object, Object> kafkaConsumer,
+                                         String topicName, TopicPartition partition, long recordOffset) {
+        this.kafkaConsumer = kafkaConsumer;
+        this.topicName = topicName;
+        this.partition = partition;
+        this.recordOffset = recordOffset;
+    }
+
+    @Override
+    public void commit() {
+        commitOffset(partition, recordOffset);
+    }
+
+    private void commitOffset(final TopicPartition partition, final long recordOffset) {
+        if (recordOffset != -1) {
+            LOG.info("Commit offsets from topic {} with offset: {}", topicName, recordOffset);
+            kafkaConsumer.commit(Collections.singletonMap(partition, new OffsetAndMetadata(recordOffset + 1, "")));
+        }
+    }
+
+    public KafkaConsumer<Object, Object> getKafkaConsumer() {
+        return kafkaConsumer;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public TopicPartition getPartition() {
+        return partition;
+    }
+
+    public long getRecordOffset() {
+        return recordOffset;
+    }
+}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommitFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommitFactory.java
new file mode 100644
index 0000000..baf2073
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/DefaultVertxKafkaManualCommitFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka.offset;
+
+import io.vertx.kafka.client.common.TopicPartition;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
+
+public class DefaultVertxKafkaManualCommitFactory implements VertxKafkaManualCommitFactory {
+
+    @Override
+    public VertxKafkaManualCommit create(
+            KafkaConsumer<Object, Object> consumer, String topicName,
+            TopicPartition partition, long recordOffset) {
+        return new DefaultVertxKafkaManualCommit(consumer, topicName, partition, recordOffset);
+    }
+}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommit.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommit.java
new file mode 100644
index 0000000..07353a6
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommit.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka.offset;
+
+/**
+ * Can be used for forcing manual offset commit when using Kafka consumer.
+ */
+public interface VertxKafkaManualCommit {
+
+    /**
+     * Commit offsets to Kafka
+     */
+    void commit();
+}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommitFactory.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommitFactory.java
new file mode 100644
index 0000000..6122f32
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/offset/VertxKafkaManualCommitFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka.offset;
+
+import io.vertx.kafka.client.common.TopicPartition;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
+import org.apache.camel.Exchange;
+
+/**
+ * Factory to create a new {@link VertxKafkaManualCommit} to store on the {@link Exchange}.
+ */
+public interface VertxKafkaManualCommitFactory {
+
+    /**
+     * Creates a new instance
+     */
+    VertxKafkaManualCommit create(
+            KafkaConsumer<Object, Object> consumer, String topicName,
+            TopicPartition partition, long recordOffset);
+}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/MockConsumerInterceptor.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/MockConsumerInterceptor.java
new file mode 100644
index 0000000..a5dccfa
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/MockConsumerInterceptor.java
@@ -0,0 +1,35 @@
+package org.apache.camel.component.vertx.kafka;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+public class MockConsumerInterceptor implements ConsumerInterceptor<String, String> {
+
+    public static ArrayList<ConsumerRecords<String, String>> recordsCaptured = new ArrayList<>();
+
+    @Override
+    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
+        recordsCaptured.add(consumerRecords);
+        return consumerRecords;
+    }
+
+    @Override
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
+        // noop
+    }
+
+    @Override
+    public void close() {
+        // noop
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+        // noop
+    }
+}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumerManualCommitTest.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumerManualCommitTest.java
new file mode 100644
index 0000000..04695d2
--- /dev/null
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumerManualCommitTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class VertxKafkaConsumerManualCommitTest extends BaseEmbeddedKafkaTest {
+
+    public static final String TOPIC = "test";
+
+    @EndpointInject("vertx-kafka:" + TOPIC
+                    + "?groupId=group1&sessionTimeoutMs=30000&enableAutoCommit=false&autoOffsetReset=earliest&"
+                    + "allowManualCommit=true")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from(from).routeId("foo").to(to).process(e -> {
+                    VertxKafkaManualCommit manual
+                            = e.getIn().getHeader(VertxKafkaConstants.MANUAL_COMMIT, VertxKafkaManualCommit.class);
+                    assertNotNull(manual);
+                    manual.commit();
+                });
+            }
+        };
+    }
+
+    @Test
+    public void kafkaManualCommit() throws Exception {
+        // We disable the default autoCommit and we take control on committing the offsets
+        // First step: We send first 5 records to Kafka, we expect our consumer to receive them and commit the offsets after consuming the records
+        // through manual.commit();
+        to.expectedMessageCount(5);
+
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Second step: We shut down our route, we expect nothing will be recovered by our route
+        context.getRouteController().stopRoute("foo");
+        to.expectedMessageCount(0);
+
+        // Third step: While our route is stopped, we send 3 records more to Kafka test topic
+        for (int k = 5; k < 8; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Fourth step: We start again our route, since we have been committing the offsets from the first step,
+        // we will expect to consume from the latest committed offset e.g from offset 5
+        context.getRouteController().startRoute("foo");
+        to.expectedMessageCount(3);
+
+        // give some time for the route to start again
+        synchronized (this) {
+            Thread.sleep(1000);
+        }
+
+        to.assertIsSatisfied(3000);
+    }
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java
index 4b66285..3c60795 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/VertxKafkaComponentBuilderFactory.java
@@ -482,6 +482,30 @@ public interface VertxKafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * Whether to allow doing manual commits via
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit.
+         * If this option is enabled then an instance of
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit
+         * is stored on the Exchange message header, which allows end users to
+         * access this API and perform manual offset commits via the Kafka
+         * consumer. Note: To take full control of the offset committing, you
+         * may need to disable the Kafka Consumer default auto commit behavior
+         * by setting 'enableAutoCommit' to 'false'.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param allowManualCommit the value to set
+         * @return the dsl builder
+         */
+        default VertxKafkaComponentBuilder allowManualCommit(
+                boolean allowManualCommit) {
+            doSetProperty("allowManualCommit", allowManualCommit);
+            return this;
+        }
+        /**
          * The frequency in milliseconds that the consumer offsets are
          * auto-committed to Kafka if enable.auto.commit is set to true.
          * 
@@ -961,6 +985,28 @@ public interface VertxKafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * Factory to use for creating
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit
+         * instances. This allows to plugin a custom factory to create custom
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit
+         * instances in case special logic is needed when doing manual commits
+         * that deviates from the default implementation that comes out of the
+         * box.
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory&lt;/code&gt; type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param kafkaManualCommitFactory the value to set
+         * @return the dsl builder
+         */
+        default VertxKafkaComponentBuilder kafkaManualCommitFactory(
+                org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory kafkaManualCommitFactory) {
+            doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory);
+            return this;
+        }
+        /**
          * The number of acknowledgments the producer requires the leader to
          * have received before considering a request complete. This controls
          * the durability of records that are sent. The following settings are
@@ -2123,6 +2169,7 @@ public interface VertxKafkaComponentBuilderFactory {
             case "socketConnectionSetupTimeoutMaxMs": getOrCreateConfiguration((VertxKafkaComponent) component).setSocketConnectionSetupTimeoutMaxMs((long) value); return true;
             case "socketConnectionSetupTimeoutMs": getOrCreateConfiguration((VertxKafkaComponent) component).setSocketConnectionSetupTimeoutMs((long) value); return true;
             case "allowAutoCreateTopics": getOrCreateConfiguration((VertxKafkaComponent) component).setAllowAutoCreateTopics((boolean) value); return true;
+            case "allowManualCommit": getOrCreateConfiguration((VertxKafkaComponent) component).setAllowManualCommit((boolean) value); return true;
             case "autoCommitIntervalMs": getOrCreateConfiguration((VertxKafkaComponent) component).setAutoCommitIntervalMs((int) value); return true;
             case "autoOffsetReset": getOrCreateConfiguration((VertxKafkaComponent) component).setAutoOffsetReset((java.lang.String) value); return true;
             case "bridgeErrorHandler": ((VertxKafkaComponent) component).setBridgeErrorHandler((boolean) value); return true;
@@ -2147,6 +2194,7 @@ public interface VertxKafkaComponentBuilderFactory {
             case "seekToPosition": getOrCreateConfiguration((VertxKafkaComponent) component).setSeekToPosition((java.lang.String) value); return true;
             case "sessionTimeoutMs": getOrCreateConfiguration((VertxKafkaComponent) component).setSessionTimeoutMs((int) value); return true;
             case "valueDeserializer": getOrCreateConfiguration((VertxKafkaComponent) component).setValueDeserializer((java.lang.String) value); return true;
+            case "kafkaManualCommitFactory": ((VertxKafkaComponent) component).setKafkaManualCommitFactory((org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommitFactory) value); return true;
             case "acks": getOrCreateConfiguration((VertxKafkaComponent) component).setAcks((java.lang.String) value); return true;
             case "batchSize": getOrCreateConfiguration((VertxKafkaComponent) component).setBatchSize((int) value); return true;
             case "bufferMemory": getOrCreateConfiguration((VertxKafkaComponent) component).setBufferMemory((long) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java
index c634c82..7d16558 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/VertxKafkaEndpointBuilderFactory.java
@@ -766,6 +766,55 @@ public interface VertxKafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * Whether to allow doing manual commits via
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit.
+         * If this option is enabled then an instance of
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit
+         * is stored on the Exchange message header, which allows end users to
+         * access this API and perform manual offset commits via the Kafka
+         * consumer. Note: To take full control of the offset committing, you
+         * may need to disable the Kafka Consumer default auto commit behavior
+         * by setting 'enableAutoCommit' to 'false'.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param allowManualCommit the value to set
+         * @return the dsl builder
+         */
+        default VertxKafkaEndpointConsumerBuilder allowManualCommit(
+                boolean allowManualCommit) {
+            doSetProperty("allowManualCommit", allowManualCommit);
+            return this;
+        }
+        /**
+         * Whether to allow doing manual commits via
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit.
+         * If this option is enabled then an instance of
+         * org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit
+         * is stored on the Exchange message header, which allows end users to
+         * access this API and perform manual offset commits via the Kafka
+         * consumer. Note: To take full control of the offset committing, you
+         * may need to disable the Kafka Consumer default auto commit behavior
+         * by setting 'enableAutoCommit' to 'false'.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param allowManualCommit the value to set
+         * @return the dsl builder
+         */
+        default VertxKafkaEndpointConsumerBuilder allowManualCommit(
+                String allowManualCommit) {
+            doSetProperty("allowManualCommit", allowManualCommit);
+            return this;
+        }
+        /**
          * The frequency in milliseconds that the consumer offsets are
          * auto-committed to Kafka if enable.auto.commit is set to true.
          * 
diff --git a/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc b/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc
index c46ce1f..acce79e 100644
--- a/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/vertx-kafka-component.adoc
@@ -76,7 +76,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (101 parameters):
+=== Query Parameters (102 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -104,6 +104,7 @@ with the following path and query parameters:
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -188,7 +189,7 @@ with the following path and query parameters:
 
 == Component Options
 // component options: START
-The Vert.x Kafka component supports 104 options, which are listed below.
+The Vert.x Kafka component supports 106 options, which are listed below.
 
 
 
@@ -218,6 +219,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *socketConnectionSetupTimeout{zwsp}MaxMs* (common) | The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. | 2m7s | long
 | *socketConnectionSetupTimeoutMs* (common) | The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. | 10s | long
 | *allowAutoCreateTopics* (consumer) | Allow automatic topic creation on the broker when subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the broker allows for it using auto.create.topics.enable broker configuration. This configuration must be set to false when using brokers older than 0.11.0 | true | boolean
+| *allowManualCommit* (consumer) | Whether to allow doing manual commits via org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit. If this option is enabled then an instance of org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. Note: To take full control of the offset committing, you may need to disable the Kafka C [...]
 | *autoCommitIntervalMs* (consumer) | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. | 5s | int
 | *autoOffsetReset* (consumer) | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offsetlatest: automatically reset the offset to the latest offsetnone: throw exception to the consumer if no previous offset is found for the consumer's groupanything else: throw exception to the consumer. There are 3 enums and the value can be [...]
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
@@ -242,6 +244,7 @@ The Vert.x Kafka component supports 104 options, which are listed below.
 | *seekToPosition* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeou [...]
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaManualCommitFactory* (consumer) | *Autowired* Factory to use for creating org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances. This allows to plugin a custom factory to create custom org.apache.camel.component.vertx.kafka.offset.VertxKafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | VertxKafkaManualCommitFactory
 | *acks* (producer) | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, a [...]
 | *batchSize* (producer) | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batchi [...]
 | *bufferMemory* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compres [...]
@@ -406,6 +409,38 @@ from("vertx-kafka:test_topic?bootstrapServers=kafka9092")
 By default all headers are being filtered by `VertxKafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes.
 
+
+== Using manual commit with Kafka consumer
+
+By default the Kafka consumer will use auto commit, where the offset will be committed automatically in the background using a given interval.
+
+In case you want to force manual commits, you can use `VertxKafkaManualCommit` API from the Camel Exchange, stored on the message header.
+This requires to turn on manual commits by either setting the option `allowManualCommit` to `true` on the `VertxKafkaComponent`
+or on the endpoint, for example:
+
+[source,java]
+----
+VertxKafkaComponent kafka = new VertxKafkaComponent();
+kafka.setAllowManualCommit(true);
+...
+camelContext.addComponent("vertx-kafka", kafka);
+----
+
+You can then use the `VertxKafkaManualCommit` from Java code such as a Camel `Processor`:
+[source,java]
+----
+public void process(Exchange exchange) {
+    VertxKafkaManualCommit manual =
+        exchange.getIn().getHeader(VertxKafkaConstants.MANUAL_COMMIT, VertxKafkaManualCommit.class);
+    manual.commit();
+}
+----
+
+This will force a asynchronous commit to Kafka.
+
+If you want to use a custom implementation of `VertxKafkaManualCommit` then you can configure a custom `VertxKafkaManualCommitFactory`
+on the `VertxKafkaComponent` that creates instances of your custom implementation.
+
 === Consumer Example
 Here is the minimal route you need in order to read messages from Kafka.