You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/07/26 07:48:54 UTC

[2/2] camel git commit: CAMEL-11542 - camel-kafka - Add any new options from kafka 0.11.0 to the endpoint

CAMEL-11542 - camel-kafka - Add any new options from kafka 0.11.0 to the endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0753f11f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0753f11f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0753f11f

Branch: refs/heads/master
Commit: 0753f11f1070b586555dedd171d590be737d43ba
Parents: 7b921b0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Jul 26 09:44:39 2017 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Jul 26 09:48:38 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  3 +-
 .../component/kafka/KafkaConfiguration.java     | 96 +++++++++++---------
 .../component/kafka/KafkaComponentTest.java     |  1 +
 .../springboot/KafkaComponentConfiguration.java | 17 ++++
 4 files changed, 75 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7b01d05..e67d7a7 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -67,7 +67,7 @@ with the following path and query parameters:
 | **topic** | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. |  | String
 |=======================================================================
 
-#### Query Parameters (84 parameters):
+#### Query Parameters (85 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -105,6 +105,7 @@ with the following path and query parameters:
 | **circularTopicDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | none | String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
+| **enableIdempotence** (producer) | If set to 'true' the producer will ensure that exactly one copy of each message is written in the stream. If 'false' producer retries may write duplicates of the retried message in the stream. If set to true this option will require max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero and additionally acks must be set to 'all'. | false | boolean
 | **key** (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY |  | String
 | **keySerializerClass** (producer) | The serializer class for keys (defaults to the same as for messages if nothing is given). | org.apache.kafka.common.serialization.StringSerializer | String
 | **lingerMs** (producer) | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaythat is rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This s
 etting defaults to 0 (i.e. no delay). Setting linger.ms=5 for example would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. | 0 | Integer

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index ddb04a9..ae54443 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -206,6 +206,10 @@ public class KafkaConfiguration implements Cloneable {
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "50")
     private Integer reconnectBackoffMs = 50;
+    //enable.idempotence
+    //reconnect.backoff.ms
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean enableIdempotence = false;
 
     // SSL
     @UriParam(label = "common,security")
@@ -307,13 +311,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
         addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
         addPropertyIfNotNull(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
-        // SSL
-        applySslConfiguration(props, getSslContextParameters());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries());
         addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
         addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
@@ -324,15 +321,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
         addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
         addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
-        // Security protocol
-        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
         addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
@@ -340,6 +329,24 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
         addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isEnableIdempotence());
+        // SSL
+        applySslConfiguration(props, getSslContextParameters());
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         //SASL
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
@@ -348,11 +355,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
         addListPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, getSaslMechanism());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
 
         return props;
     }
@@ -368,28 +370,12 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
         addPropertyIfNotNull(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
-        // SSL
-        applySslConfiguration(props, getSslContextParameters());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
         addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
         addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
         addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
         addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
         addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
-        // Security protocol
-        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
-        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
-        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
         addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
@@ -400,6 +386,26 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
         addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        
+        // SSL
+        applySslConfiguration(props, getSslContextParameters());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        // Security protocol
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
         //SASL
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
@@ -408,11 +414,6 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
         addListPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, getSaslMechanism());
-        //SSL
-        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
-        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
-        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
-        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         return props;
     }
 
@@ -1495,4 +1496,17 @@ public class KafkaConfiguration implements Cloneable {
     public void setInterceptorClasses(String interceptorClasses) {
         this.interceptorClasses = interceptorClasses;
     }
+
+    public boolean isEnableIdempotence() {
+        return enableIdempotence;
+    }
+
+    /**
+     * If set to 'true' the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer 
+     * retries may write duplicates of the retried message in the stream. If set to true this option will require max.in.flight.requests.per.connection to be set to 1 and 
+     * retries cannot be zero and additionally acks must be set to 'all'. 
+     */
+    public void setEnableIdempotence(boolean enableIdempotence) {
+        this.enableIdempotence = enableIdempotence;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 5a8af58..fa75551 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -149,6 +149,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
         props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, TLSv1");
         props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");

http://git-wip-us.apache.org/repos/asf/camel/blob/0753f11f/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 2ced5ac..419a037 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -672,6 +672,15 @@ public class KafkaComponentConfiguration
          * class cast exception in runtime
          */
         private String interceptorClasses;
+        /**
+         * If set to 'true' the producer will ensure that exactly one copy of
+         * each message is written in the stream. If 'false', producer retries
+         * may write duplicates of the retried message in the stream. If set to
+         * true this option will require max.in.flight.requests.per.connection
+         * to be set to 1 and retries cannot be zero and additionally acks must
+         * be set to 'all'.
+         */
+        private Boolean enableIdempotence = false;
 
         public String getGroupId() {
             return groupId;
@@ -1325,5 +1334,13 @@ public class KafkaComponentConfiguration
         public void setInterceptorClasses(String interceptorClasses) {
             this.interceptorClasses = interceptorClasses;
         }
+
+        public Boolean getEnableIdempotence() {
+            return enableIdempotence;
+        }
+
+        public void setEnableIdempotence(Boolean enableIdempotence) {
+            this.enableIdempotence = enableIdempotence;
+        }
     }
 }
\ No newline at end of file