You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/12 09:20:40 UTC

camel git commit: CAMEL-10994: camel-kafka - Allow to configure more options on component level

Repository: camel
Updated Branches:
  refs/heads/master 330228ac7 -> 6a87dad94


CAMEL-10994: camel-kafka - Allow to configure more options on component level


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6a87dad9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6a87dad9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6a87dad9

Branch: refs/heads/master
Commit: 6a87dad940af9868d6be3cdc70c1193ab26bdcf1
Parents: 330228a
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 12 10:08:08 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 12 10:15:17 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |   10 +-
 .../camel/component/kafka/KafkaComponent.java   |   34 +-
 .../component/kafka/KafkaConfiguration.java     |   47 +-
 .../springboot/KafkaComponentConfiguration.java | 1154 ++++++++++++++++++
 4 files changed, 1209 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6a87dad9/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d24345c..ffdb33f 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -34,13 +34,14 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 3 options which are listed below.
+The Kafka component supports 4 options which are listed below.
 
 
 
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
+| configuration | common |  | KafkaConfiguration | Allows to pre-configure the Kafka component with common options that the endpoints will reuse.
 | brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation.
 | workerPool | advanced |  | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
 | resolvePropertyPlaceholders | advanced | true | boolean | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.
@@ -64,27 +65,25 @@ with the following path and query parameters:
 | topic |  | String | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic.
 |=======================================================================
 
-#### Query Parameters (82 parameters):
+#### Query Parameters (81 parameters):
 
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | brokers | common |  | String | URL of the Kafka brokers to use. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation.
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
-| groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.
-| partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer offsets are committed to zookeeper.
 | autoCommitOnStop | consumer | sync | String | Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the last consumed message. This requires the option autoCommitEnable is turned on. The possible values are: sync async or none. And sync is the default value.
 | autoOffsetReset | consumer | latest | String | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset fail: throw exception to the consumer
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | checkCrcs | consumer | true | Boolean | Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead so it may be disabled in cases seeking extreme performance.
-| consumerId | consumer |  | String | Generated automatically if not set.
 | consumerRequestTimeoutMs | consumer | 40000 | Integer | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
 | consumersCount | consumer | 1 | int | The number of consumers that connect to kafka server
 | consumerStreams | consumer | 10 | int | Number of concurrent consumers on the consumer
 | fetchMinBytes | consumer | 1024 | Integer | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
 | fetchWaitMaxMs | consumer | 500 | Integer | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
+| groupId | consumer |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.
 | heartbeatIntervalMs | consumer | 3000 | Integer | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
 | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens the consumer can get stuck trying to fetch a large message on a certain partition.
@@ -111,6 +110,7 @@ with the following path and query parameters:
 | metricReporters | producer |  | String | A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.
 | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples maintained to compute metrics.
 | noOfMetricsSample | producer | 2 | Integer | The number of samples maintained to compute metrics.
+| partitioner | producer | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
 | partitionKey | producer |  | Integer | The partition to which the record will be sent (or null if no partition was specified). If this option has been configured then it take precedence over header link KafkaConstantsPARTITION_KEY
 | producerBatchSize | producer | 16384 | Integer | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.

http://git-wip-us.apache.org/repos/asf/camel/blob/6a87dad9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index da0f59d..ca375df 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -26,9 +26,8 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.util.ObjectHelper;
 
 public class KafkaComponent extends UriEndpointComponent {
-    
-    @Metadata(label = "common")
-    private String brokers;
+
+    private KafkaConfiguration configuration;
 
     @Metadata(label = "advanced")
     private ExecutorService workerPool;
@@ -43,11 +42,17 @@ public class KafkaComponent extends UriEndpointComponent {
 
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
-        KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
-
         if (ObjectHelper.isEmpty(remaining)) {
             throw new IllegalArgumentException("Topic must be configured on endpoint using syntax kafka:topic");
         }
+
+        KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
+
+        if (configuration != null) {
+            KafkaConfiguration copy = configuration.copy();
+            endpoint.setConfiguration(copy);
+        }
+
         endpoint.getConfiguration().setTopic(remaining);
         endpoint.getConfiguration().setWorkerPool(getWorkerPool());
 
@@ -59,8 +64,19 @@ public class KafkaComponent extends UriEndpointComponent {
         return endpoint;
     }
 
+    public KafkaConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * Allows to pre-configure the Kafka component with common options that the endpoints will reuse.
+     */
+    public void setConfiguration(KafkaConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
     public String getBrokers() {
-        return brokers;
+        return configuration != null ? configuration.getBrokers() : null;
     }
 
     /**
@@ -70,9 +86,13 @@ public class KafkaComponent extends UriEndpointComponent {
      * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
      */
     public void setBrokers(String brokers) {
-        this.brokers = brokers;
+        if (configuration == null) {
+            configuration = new KafkaConfiguration();
+        }
+        configuration.setBrokers(brokers);
     }
 
+
     public ExecutorService getWorkerPool() {
         return workerPool;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6a87dad9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 2afb3a8..dc3ae72 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spi.UriParam;
@@ -42,26 +43,23 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 
 @UriParams
-public class KafkaConfiguration {
+public class KafkaConfiguration implements Cloneable {
 
-    @UriPath @Metadata(required = "true")
+    //Common configuration properties
+    @UriPath(label = "common") @Metadata(required = "true")
     private String topic;
+    @UriParam(label = "common")
+    private String brokers;
+    @UriParam(label = "common")
+    private String clientId;
 
-    @UriParam
+    @UriParam(label = "consumer")
     private String groupId;
-    @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
-    private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
     @UriParam(label = "consumer", defaultValue = "10")
     private int consumerStreams = 10;
     @UriParam(label = "consumer", defaultValue = "1")
     private int consumersCount = 1;
 
-    //Common configuration properties
-    @UriParam(label = "common")
-    private String brokers;
-    @UriParam
-    private String clientId;
-
     //interceptor.classes
     @UriParam(label = "common,monitoring")
     private String interceptorClasses;
@@ -110,8 +108,6 @@ public class KafkaConfiguration {
     private String seekTo;
 
     //Consumer configuration properties
-    @UriParam(label = "consumer")
-    private String consumerId;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
     @UriParam(label = "consumer", defaultValue = "sync", enums = "sync,async,none")
@@ -120,6 +116,8 @@ public class KafkaConfiguration {
     private StateRepository<String, String> offsetRepository;
 
     //Producer configuration properties
+    @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
+    private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
     @UriParam(label = "producer", defaultValue = "100")
     private Integer retryBackoffMs = 100;
 
@@ -276,6 +274,18 @@ public class KafkaConfiguration {
     public KafkaConfiguration() {
     }
 
+    /**
+     * Returns a copy of this configuration
+     */
+    public KafkaConfiguration copy() {
+        try {
+            KafkaConfiguration copy = (KafkaConfiguration) clone();
+            return copy;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
     public Properties createProducerProperties() {
         Properties props = new Properties();
         addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass());
@@ -536,17 +546,6 @@ public class KafkaConfiguration {
         this.clientId = clientId;
     }
 
-    public String getConsumerId() {
-        return consumerId;
-    }
-
-    /**
-     * Generated automatically if not set.
-     */
-    public void setConsumerId(String consumerId) {
-        this.consumerId = consumerId;
-    }
-
     public Boolean isAutoCommitEnable() {
         return offsetRepository == null ? autoCommitEnable : false;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6a87dad9/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 1a70239..da5d483 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -17,7 +17,10 @@
 package org.apache.camel.component.kafka.springboot;
 
 import java.util.concurrent.ExecutorService;
+import org.apache.camel.spi.StateRepository;
+import org.apache.camel.util.jsse.SSLContextParameters;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.NestedConfigurationProperty;
 
 /**
  * The kafka component allows messages to be sent to (or consumed from) Apache
@@ -29,6 +32,11 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class KafkaComponentConfiguration {
 
     /**
+     * Allows to pre-configure the Kafka component with common options that the
+     * endpoints will reuse.
+     */
+    private KafkaConfigurationNestedConfiguration configuration;
+    /**
      * URL of the Kafka brokers to use. The format is host1:port1host2:port2 and
      * the list can be a subset of brokers or a VIP pointing to a subset of
      * brokers. This option is known as bootstrap.servers in the Kafka
@@ -50,6 +58,15 @@ public class KafkaComponentConfiguration {
      */
     private Boolean resolvePropertyPlaceholders = true;
 
+    public KafkaConfigurationNestedConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(
+            KafkaConfigurationNestedConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
     public String getBrokers() {
         return brokers;
     }
@@ -74,4 +91,1141 @@ public class KafkaComponentConfiguration {
             Boolean resolvePropertyPlaceholders) {
         this.resolvePropertyPlaceholders = resolvePropertyPlaceholders;
     }
+
+    public static class KafkaConfigurationNestedConfiguration {
+        public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.kafka.KafkaConfiguration.class;
+        /**
+         * A string that uniquely identifies the group of consumer processes to
+         * which this consumer belongs. By setting the same group id multiple
+         * processes indicate that they are all part of the same consumer group.
+         * This option is required for consumers.
+         */
+        private String groupId;
+        /**
+         * The partitioner class for partitioning messages amongst sub-topics.
+         * The default partitioner is based on the hash of the key.
+         */
+        private String partitioner = org.apache.camel.component.kafka.KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
+        /**
+         * Name of the topic to use. On the consumer you can use comma to
+         * separate multiple topics. A producer can only send a message to a
+         * single topic.
+         */
+        private String topic;
+        /**
+         * Number of concurrent consumers on the consumer
+         */
+        private Integer consumerStreams;
+        /**
+         * The number of consumers that connect to kafka server
+         */
+        private Integer consumersCount;
+        /**
+         * The client id is a user-specified string sent in each request to help
+         * trace calls. It should logically identify the application making the
+         * request.
+         */
+        private String clientId;
+        /**
+         * If true, periodically commit to ZooKeeper the offset of messages
+         * already fetched by the consumer. This committed offset will be used
+         * when the process fails as the position from which the new consumer
+         * will begin.
+         */
+        private Boolean autoCommitEnable;
+        /**
+         * The offset repository to use in order to locally store the offset of
+         * each partition of the topic. Defining one will disable the
+         * autocommit.
+         */
+        private StateRepository offsetRepository;
+        /**
+         * The frequency in ms that the consumer offsets are committed to
+         * zookeeper.
+         */
+        private Integer autoCommitIntervalMs;
+        /**
+         * The minimum amount of data the server should return for a fetch
+         * request. If insufficient data is available the request will wait for
+         * that much data to accumulate before answering the request.
+         */
+        private Integer fetchMinBytes;
+        /**
+         * The maximum amount of time the server will block before answering the
+         * fetch request if there isn't sufficient data to immediately satisfy
+         * fetch.min.bytes
+         */
+        private Integer fetchWaitMaxMs;
+        /**
+         * What to do when there is no initial offset in ZooKeeper or if an
+         * offset is out of range: smallest : automatically reset the offset to
+         * the smallest offset largest : automatically reset the offset to the
+         * largest offset fail: throw exception to the consumer
+         */
+        private String autoOffsetReset = "latest";
+        /**
+         * Whether to perform an explicit auto commit when the consumer stops to
+         * ensure the broker has a commit from the last consumed message. This
+         * requires the option autoCommitEnable is turned on. The possible
+         * values are: sync, async, or none. And sync is the default value.
+         */
+        private String autoCommitOnStop = "sync";
+        /**
+         * URL of the Kafka brokers to use. The format is
+         * host1:port1,host2:port2, and the list can be a subset of brokers or a
+         * VIP pointing to a subset of brokers.
+         * <p/>
+         * This option is known as <tt>bootstrap.servers</tt> in the Kafka
+         * documentation.
+         */
+        private String brokers;
+        /**
+         * This parameter allows you to specify the compression codec for all
+         * data generated by this producer. Valid values are "none", "gzip" and
+         * "snappy".
+         */
+        private String compressionCodec = "none";
+        /**
+         * Before each retry, the producer refreshes the metadata of relevant
+         * topics to see if a new leader has been elected. Since leader election
+         * takes a bit of time, this property specifies the amount of time that
+         * the producer waits before refreshing the metadata.
+         */
+        private Integer retryBackoffMs;
+        /**
+         * Socket write buffer size
+         */
+        private Integer sendBufferBytes;
+        /**
+         * The amount of time the broker will wait trying to meet the
+         * request.required.acks requirement before sending back an error to the
+         * client.
+         */
+        private Integer requestTimeoutMs;
+        /**
+         * The maximum number of unsent messages that can be queued up the
+         * producer when using async mode before either the producer must be
+         * blocked or data must be dropped.
+         */
+        private Integer queueBufferingMaxMessages;
+        /**
+         * The serializer class for messages.
+         */
+        private String serializerClass = org.apache.camel.component.kafka.KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        /**
+         * The serializer class for keys (defaults to the same as for messages
+         * if nothing is given).
+         */
+        private String keySerializerClass = org.apache.camel.component.kafka.KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        /**
+         * Kerberos kinit command path. Default is /usr/bin/kinit
+         */
+        private String kerberosInitCmd = org.apache.kafka.common.config.SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
+        /**
+         * Login thread sleep time between refresh attempts.
+         */
+        private Integer kerberosBeforeReloginMinTime;
+        /**
+         * Percentage of random jitter added to the renewal time.
+         */
+        private Double kerberosRenewJitter;
+        /**
+         * Login thread will sleep until the specified window factor of time
+         * from last refresh to ticket's expiry has been reached, at which time
+         * it will try to renew the ticket.
+         */
+        private Double kerberosRenewWindowFactor;
+        /**
+         * A list of rules for mapping from principal names to short names
+         * (typically operating system usernames). The rules are evaluated in
+         * order and the first rule that matches a principal name is used to map
+         * it to a short name. Any later rules in the list are ignored. By
+         * default, principal names of the form {username}/{hostname}@{REALM}
+         * are mapped to {username}. For more details on the format please see
+         * <a href=\"#security_authz\"> security authorization and acls</a>.
+         * <p/>
+         * Multiple values can be separated by comma
+         */
+        private String kerberosPrincipalToLocalRules = "DEFAULT";
+        /**
+         * A list of cipher suites. This is a named combination of
+         * authentication, encryption, MAC and key exchange algorithm used to
+         * negotiate the security settings for a network connection using TLS or
+         * SSL network protocol.By default all the available cipher suites are
+         * supported.
+         */
+        private String sslCipherSuites;
+        /**
+         * The endpoint identification algorithm to validate server hostname
+         * using server certificate.
+         */
+        private String sslEndpointAlgorithm;
+        /**
+         * The algorithm used by key manager factory for SSL connections.
+         * Default value is the key manager factory algorithm configured for the
+         * Java Virtual Machine.
+         */
+        private String sslKeymanagerAlgorithm = "SunX509";
+        /**
+         * The algorithm used by trust manager factory for SSL connections.
+         * Default value is the trust manager factory algorithm configured for
+         * the Java Virtual Machine.
+         */
+        private String sslTrustmanagerAlgorithm = "PKIX";
+        /**
+         * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1
+         * and TLSv1 are enabled by default.
+         */
+        private String sslEnabledProtocols = org.apache.kafka.common.config.SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+        /**
+         * The file format of the key store file. This is optional for client.
+         * Default value is JKS
+         */
+        private String sslKeystoreType = org.apache.kafka.common.config.SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+        /**
+         * The SSL protocol used to generate the SSLContext. Default setting is
+         * TLS, which is fine for most cases. Allowed values in recent JVMs are
+         * TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in
+         * older JVMs, but their usage is discouraged due to known security
+         * vulnerabilities.
+         */
+        private String sslProtocol = org.apache.kafka.common.config.SslConfigs.DEFAULT_SSL_PROTOCOL;
+        /**
+         * The name of the security provider used for SSL connections. Default
+         * value is the default security provider of the JVM.
+         */
+        private String sslProvider;
+        /**
+         * The file format of the trust store file. Default value is JKS.
+         */
+        private String sslTruststoreType = org.apache.kafka.common.config.SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+        /**
+         * The Kerberos principal name that Kafka runs as. This can be defined
+         * either in Kafka's JAAS config or in Kafka's config.
+         */
+        private String saslKerberosServiceName;
+        /**
+         * The Simple Authentication and Security Layer (SASL) Mechanism used.
+         * For the valid values see <a href=
+         * "http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml"
+         * >http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.
+         * xhtml</a>
+         */
+        private String saslMechanism = org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_MECHANISM;
+        /**
+         * Protocol used to communicate with brokers. Currently only PLAINTEXT
+         * and SSL are supported.
+         */
+        private String securityProtocol = org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+        /**
+         * SSL configuration using a Camel {@link SSLContextParameters} object.
+         * If configured it's applied before the other SSL endpoint parameters.
+         */
+        @NestedConfigurationProperty
+        private SSLContextParameters sslContextParameters;
+        /**
+         * The password of the private key in the key store file. This is
+         * optional for client.
+         */
+        private String sslKeyPassword;
+        /**
+         * The location of the key store file. This is optional for client and
+         * can be used for two-way authentication for client.
+         */
+        private String sslKeystoreLocation;
+        /**
+         * The store password for the key store file.This is optional for client
+         * and only needed if ssl.keystore.location is configured.
+         */
+        private String sslKeystorePassword;
+        /**
+         * The location of the trust store file.
+         */
+        private String sslTruststoreLocation;
+        /**
+         * The password for the trust store file.
+         */
+        private String sslTruststorePassword;
+        /**
+         * The total bytes of memory the producer can use to buffer records
+         * waiting to be sent to the server. If records are sent faster than
+         * they can be delivered to the server the producer will either block or
+         * throw an exception based on the preference specified by
+         * block.on.buffer.full.This setting should correspond roughly to the
+         * total memory the producer will use, but is not a hard bound since not
+         * all memory the producer uses is used for buffering. Some additional
+         * memory will be used for compression (if compression is enabled) as
+         * well as for maintaining in-flight requests.
+         */
+        private Integer bufferMemorySize;
+        /**
+         * The record key (or null if no key is specified). If this option has
+         * been configured then it take precedence over header
+         * {@link KafkaConstants#KEY}
+         */
+        private String key;
+        /**
+         * The partition to which the record will be sent (or null if no
+         * partition was specified). If this option has been configured then it
+         * take precedence over header {@link KafkaConstants#PARTITION_KEY}
+         */
+        private Integer partitionKey;
+        /**
+         * The number of acknowledgments the producer requires the leader to
+         * have received before considering a request complete. This controls
+         * the durability of records that are sent. The following settings are
+         * common: acks=0 If set to zero then the producer will not wait for any
+         * acknowledgment from the server at all. The record will be immediately
+         * added to the socket buffer and considered sent. No guarantee can be
+         * made that the server has received the record in this case, and the
+         * retries configuration will not take effect (as the client won't
+         * generally know of any failures). The offset given back for each
+         * record will always be set to -1. acks=1 This will mean the leader
+         * will write the record to its local log but will respond without
+         * awaiting full acknowledgement from all followers. In this case should
+         * the leader fail immediately after acknowledging the record but before
+         * the followers have replicated it then the record will be lost.
+         * acks=all This means the leader will wait for the full set of in-sync
+         * replicas to acknowledge the record. This guarantees that the record
+         * will not be lost as long as at least one in-sync replica remains
+         * alive. This is the strongest available guarantee.
+         */
+        private String requestRequiredAcks = "1";
+        /**
+         * Setting a value greater than zero will cause the client to resend any
+         * record whose send fails with a potentially transient error. Note that
+         * this retry is no different than if the client resent the record upon
+         * receiving the error. Allowing retries will potentially change the
+         * ordering of records because if two records are sent to a single
+         * partition, and the first fails and is retried but the second
+         * succeeds, then the second record may appear first.
+         */
+        private Integer retries;
+        /**
+         * The producer will attempt to batch records together into fewer
+         * requests whenever multiple records are being sent to the same
+         * partition. This helps performance on both the client and the server.
+         * This configuration controls the default batch size in bytes. No
+         * attempt will be made to batch records larger than this size.Requests
+         * sent to brokers will contain multiple batches, one for each partition
+         * with data available to be sent.A small batch size will make batching
+         * less common and may reduce throughput (a batch size of zero will
+         * disable batching entirely). A very large batch size may use memory a
+         * bit more wastefully as we will always allocate a buffer of the
+         * specified batch size in anticipation of additional records.
+         */
+        private Integer producerBatchSize;
+        /**
+         * Close idle connections after the number of milliseconds specified by
+         * this config.
+         */
+        private Integer connectionMaxIdleMs;
+        /**
+         * The producer groups together any records that arrive in between
+         * request transmissions into a single batched request. Normally this
+         * occurs only under load when records arrive faster than they can be
+         * sent out. However in some circumstances the client may want to reduce
+         * the number of requests even under moderate load. This setting
+         * accomplishes this by adding a small amount of artificial delay\u2014that
+         * is, rather than immediately sending out a record the producer will
+         * wait for up to the given delay to allow other records to be sent so
+         * that the sends can be batched together. This can be thought of as
+         * analogous to Nagle's algorithm in TCP. This setting gives the upper
+         * bound on the delay for batching: once we get batch.size worth of
+         * records for a partition it will be sent immediately regardless of
+         * this setting, however if we have fewer than this many bytes
+         * accumulated for this partition we will 'linger' for the specified
+         * time waiting for more records to show up. This setting defaults to 0
+         * (i.e. no delay). Setting linger.ms=5, for example, would have the
+         * effect of reducing the number of requests sent but would add up to
+         * 5ms of latency to records sent in the absense of load.
+         */
+        private Integer lingerMs;
+        /**
+         * The configuration controls how long sending to kafka will block.
+         * These methods can be blocked for multiple reasons. For e.g: buffer
+         * full, metadata unavailable.This configuration imposes maximum limit
+         * on the total time spent in fetching metadata, serialization of key
+         * and value, partitioning and allocation of buffer memory when doing a
+         * send(). In case of partitionsFor(), this configuration imposes a
+         * maximum time threshold on waiting for metadata
+         */
+        private Integer maxBlockMs;
+        /**
+         * The maximum size of a request. This is also effectively a cap on the
+         * maximum record size. Note that the server has its own cap on record
+         * size which may be different from this. This setting will limit the
+         * number of record batches the producer will send in a single request
+         * to avoid sending huge requests.
+         */
+        private Integer maxRequestSize;
+        /**
+         * The size of the TCP receive buffer (SO_RCVBUF) to use when reading
+         * data.
+         */
+        private Integer receiveBufferBytes;
+        /**
+         * The maximum number of unacknowledged requests the client will send on
+         * a single connection before blocking. Note that if this setting is set
+         * to be greater than 1 and there are failed sends, there is a risk of
+         * message re-ordering due to retries (i.e., if retries are enabled).
+         */
+        private Integer maxInFlightRequest;
+        /**
+         * The period of time in milliseconds after which we force a refresh of
+         * metadata even if we haven't seen any partition leadership changes to
+         * proactively discover any new brokers or partitions.
+         */
+        private Integer metadataMaxAgeMs;
+        /**
+         * A list of classes to use as metrics reporters. Implementing the
+         * MetricReporter interface allows plugging in classes that will be
+         * notified of new metric creation. The JmxReporter is always included
+         * to register JMX statistics.
+         */
+        private String metricReporters;
+        /**
+         * The number of samples maintained to compute metrics.
+         */
+        private Integer noOfMetricsSample;
+        /**
+         * The number of samples maintained to compute metrics.
+         */
+        private Integer metricsSampleWindowMs;
+        /**
+         * The amount of time to wait before attempting to reconnect to a given
+         * host. This avoids repeatedly connecting to a host in a tight loop.
+         * This backoff applies to all requests sent by the consumer to the
+         * broker.
+         */
+        private Integer reconnectBackoffMs;
+        /**
+         * The expected time between heartbeats to the consumer coordinator when
+         * using Kafka's group management facilities. Heartbeats are used to
+         * ensure that the consumer's session stays active and to facilitate
+         * rebalancing when new consumers join or leave the group. The value
+         * must be set lower than session.timeout.ms, but typically should be
+         * set no higher than 1/3 of that value. It can be adjusted even lower
+         * to control the expected time for normal rebalances.
+         */
+        private Integer heartbeatIntervalMs;
+        /**
+         * The maximum amount of data per-partition the server will return. The
+         * maximum total memory used for a request will be #partitions *
+         * max.partition.fetch.bytes. This size must be at least as large as the
+         * maximum message size the server allows or else it is possible for the
+         * producer to send messages larger than the consumer can fetch. If that
+         * happens, the consumer can get stuck trying to fetch a large message
+         * on a certain partition.
+         */
+        private Integer maxPartitionFetchBytes;
+        /**
+         * The timeout used to detect failures when using Kafka's group
+         * management facilities.
+         */
+        private Integer sessionTimeoutMs;
+        /**
+         * The maximum number of records returned in a single call to poll()
+         */
+        private Integer maxPollRecords;
+        /**
+         * The timeout used when polling the KafkaConsumer.
+         */
+        private Long pollTimeoutMs;
+        /**
+         * The class name of the partition assignment strategy that the client
+         * will use to distribute partition ownership amongst consumer instances
+         * when group management is used
+         */
+        private String partitionAssignor = org.apache.camel.component.kafka.KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
+        /**
+         * The configuration controls the maximum amount of time the client will
+         * wait for the response of a request. If the response is not received
+         * before the timeout elapses the client will resend the request if
+         * necessary or fail the request if retries are exhausted.
+         */
+        private Integer consumerRequestTimeoutMs;
+        /**
+         * Automatically check the CRC32 of the records consumed. This ensures
+         * no on-the-wire or on-disk corruption to the messages occurred. This
+         * check adds some overhead, so it may be disabled in cases seeking
+         * extreme performance.
+         */
+        private Boolean checkCrcs;
+        /**
+         * Deserializer class for key that implements the Deserializer
+         * interface.
+         */
+        private String keyDeserializer = org.apache.camel.component.kafka.KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+        /**
+         * Deserializer class for value that implements the Deserializer
+         * interface.
+         */
+        private String valueDeserializer = org.apache.camel.component.kafka.KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+        /**
+         * Set if KafkaConsumer will read from beginning or end on startup:
+         * beginning : read from beginning end : read from end This is replacing
+         * the earlier property seekToBeginning
+         */
+        private String seekTo;
+        /**
+         * To use a custom worker pool for continue routing {@link Exchange}
+         * after kafka server has acknowledge the message that was sent to it
+         * from {@link KafkaProducer} using asynchronous non-blocking
+         * processing.
+         */
+        private ExecutorService workerPool;
+        /**
+         * Number of core threads for the worker pool for continue routing
+         * {@link Exchange} after kafka server has acknowledge the message that
+         * was sent to it from {@link KafkaProducer} using asynchronous
+         * non-blocking processing.
+         */
+        private Integer workerPoolCoreSize;
+        /**
+         * Maximum number of threads for the worker pool for continue routing
+         * {@link Exchange} after kafka server has acknowledge the message that
+         * was sent to it from {@link KafkaProducer} using asynchronous
+         * non-blocking processing.
+         */
+        private Integer workerPoolMaxSize;
+        /**
+         * Whether the producer should store the {@link RecordMetadata} results
+         * from sending to Kafka. The results are stored in a {@link List}
+         * containing the {@link RecordMetadata} metadata's. The list is stored
+         * on a header with the key {@link KafkaConstants#KAFKA_RECORDMETA}
+         */
+        private Boolean recordMetadata;
+        /**
+         * Sets interceptors for producer or consumers. Producer interceptors
+         * have to be classes implementing
+         * {@link org.apache.kafka.clients.producer.ProducerInterceptor}
+         * Consumer interceptors have to be classes implementing
+         * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor} Note
+         * that if you use Producer interceptor on a consumer it will throw a
+         * class cast exception in runtime
+         */
+        private String interceptorClasses;
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public void setGroupId(String groupId) {
+            this.groupId = groupId;
+        }
+
+        public String getPartitioner() {
+            return partitioner;
+        }
+
+        public void setPartitioner(String partitioner) {
+            this.partitioner = partitioner;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getConsumerStreams() {
+            return consumerStreams;
+        }
+
+        public void setConsumerStreams(Integer consumerStreams) {
+            this.consumerStreams = consumerStreams;
+        }
+
+        public Integer getConsumersCount() {
+            return consumersCount;
+        }
+
+        public void setConsumersCount(Integer consumersCount) {
+            this.consumersCount = consumersCount;
+        }
+
+        public String getClientId() {
+            return clientId;
+        }
+
+        public void setClientId(String clientId) {
+            this.clientId = clientId;
+        }
+
+        public Boolean getAutoCommitEnable() {
+            return autoCommitEnable;
+        }
+
+        public void setAutoCommitEnable(Boolean autoCommitEnable) {
+            this.autoCommitEnable = autoCommitEnable;
+        }
+
+        public StateRepository getOffsetRepository() {
+            return offsetRepository;
+        }
+
+        public void setOffsetRepository(StateRepository offsetRepository) {
+            this.offsetRepository = offsetRepository;
+        }
+
+        public Integer getAutoCommitIntervalMs() {
+            return autoCommitIntervalMs;
+        }
+
+        public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+            this.autoCommitIntervalMs = autoCommitIntervalMs;
+        }
+
+        public Integer getFetchMinBytes() {
+            return fetchMinBytes;
+        }
+
+        public void setFetchMinBytes(Integer fetchMinBytes) {
+            this.fetchMinBytes = fetchMinBytes;
+        }
+
+        public Integer getFetchWaitMaxMs() {
+            return fetchWaitMaxMs;
+        }
+
+        public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+            this.fetchWaitMaxMs = fetchWaitMaxMs;
+        }
+
+        public String getAutoOffsetReset() {
+            return autoOffsetReset;
+        }
+
+        public void setAutoOffsetReset(String autoOffsetReset) {
+            this.autoOffsetReset = autoOffsetReset;
+        }
+
+        public String getAutoCommitOnStop() {
+            return autoCommitOnStop;
+        }
+
+        public void setAutoCommitOnStop(String autoCommitOnStop) {
+            this.autoCommitOnStop = autoCommitOnStop;
+        }
+
+        public String getBrokers() {
+            return brokers;
+        }
+
+        public void setBrokers(String brokers) {
+            this.brokers = brokers;
+        }
+
+        public String getCompressionCodec() {
+            return compressionCodec;
+        }
+
+        public void setCompressionCodec(String compressionCodec) {
+            this.compressionCodec = compressionCodec;
+        }
+
+        public Integer getRetryBackoffMs() {
+            return retryBackoffMs;
+        }
+
+        public void setRetryBackoffMs(Integer retryBackoffMs) {
+            this.retryBackoffMs = retryBackoffMs;
+        }
+
+        public Integer getSendBufferBytes() {
+            return sendBufferBytes;
+        }
+
+        public void setSendBufferBytes(Integer sendBufferBytes) {
+            this.sendBufferBytes = sendBufferBytes;
+        }
+
+        public Integer getRequestTimeoutMs() {
+            return requestTimeoutMs;
+        }
+
+        public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+            this.requestTimeoutMs = requestTimeoutMs;
+        }
+
+        public Integer getQueueBufferingMaxMessages() {
+            return queueBufferingMaxMessages;
+        }
+
+        public void setQueueBufferingMaxMessages(
+                Integer queueBufferingMaxMessages) {
+            this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+        }
+
+        public String getSerializerClass() {
+            return serializerClass;
+        }
+
+        public void setSerializerClass(String serializerClass) {
+            this.serializerClass = serializerClass;
+        }
+
+        public String getKeySerializerClass() {
+            return keySerializerClass;
+        }
+
+        public void setKeySerializerClass(String keySerializerClass) {
+            this.keySerializerClass = keySerializerClass;
+        }
+
+        public String getKerberosInitCmd() {
+            return kerberosInitCmd;
+        }
+
+        public void setKerberosInitCmd(String kerberosInitCmd) {
+            this.kerberosInitCmd = kerberosInitCmd;
+        }
+
+        public Integer getKerberosBeforeReloginMinTime() {
+            return kerberosBeforeReloginMinTime;
+        }
+
+        public void setKerberosBeforeReloginMinTime(
+                Integer kerberosBeforeReloginMinTime) {
+            this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
+        }
+
+        public Double getKerberosRenewJitter() {
+            return kerberosRenewJitter;
+        }
+
+        public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+            this.kerberosRenewJitter = kerberosRenewJitter;
+        }
+
+        public Double getKerberosRenewWindowFactor() {
+            return kerberosRenewWindowFactor;
+        }
+
+        public void setKerberosRenewWindowFactor(
+                Double kerberosRenewWindowFactor) {
+            this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
+        }
+
+        public String getKerberosPrincipalToLocalRules() {
+            return kerberosPrincipalToLocalRules;
+        }
+
+        public void setKerberosPrincipalToLocalRules(
+                String kerberosPrincipalToLocalRules) {
+            this.kerberosPrincipalToLocalRules = kerberosPrincipalToLocalRules;
+        }
+
+        public String getSslCipherSuites() {
+            return sslCipherSuites;
+        }
+
+        public void setSslCipherSuites(String sslCipherSuites) {
+            this.sslCipherSuites = sslCipherSuites;
+        }
+
+        public String getSslEndpointAlgorithm() {
+            return sslEndpointAlgorithm;
+        }
+
+        public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+            this.sslEndpointAlgorithm = sslEndpointAlgorithm;
+        }
+
+        public String getSslKeymanagerAlgorithm() {
+            return sslKeymanagerAlgorithm;
+        }
+
+        public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+            this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
+        }
+
+        public String getSslTrustmanagerAlgorithm() {
+            return sslTrustmanagerAlgorithm;
+        }
+
+        public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+            this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
+        }
+
+        public String getSslEnabledProtocols() {
+            return sslEnabledProtocols;
+        }
+
+        public void setSslEnabledProtocols(String sslEnabledProtocols) {
+            this.sslEnabledProtocols = sslEnabledProtocols;
+        }
+
+        public String getSslKeystoreType() {
+            return sslKeystoreType;
+        }
+
+        public void setSslKeystoreType(String sslKeystoreType) {
+            this.sslKeystoreType = sslKeystoreType;
+        }
+
+        public String getSslProtocol() {
+            return sslProtocol;
+        }
+
+        public void setSslProtocol(String sslProtocol) {
+            this.sslProtocol = sslProtocol;
+        }
+
+        public String getSslProvider() {
+            return sslProvider;
+        }
+
+        public void setSslProvider(String sslProvider) {
+            this.sslProvider = sslProvider;
+        }
+
+        public String getSslTruststoreType() {
+            return sslTruststoreType;
+        }
+
+        public void setSslTruststoreType(String sslTruststoreType) {
+            this.sslTruststoreType = sslTruststoreType;
+        }
+
+        public String getSaslKerberosServiceName() {
+            return saslKerberosServiceName;
+        }
+
+        public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+            this.saslKerberosServiceName = saslKerberosServiceName;
+        }
+
+        public String getSaslMechanism() {
+            return saslMechanism;
+        }
+
+        public void setSaslMechanism(String saslMechanism) {
+            this.saslMechanism = saslMechanism;
+        }
+
+        public String getSecurityProtocol() {
+            return securityProtocol;
+        }
+
+        public void setSecurityProtocol(String securityProtocol) {
+            this.securityProtocol = securityProtocol;
+        }
+
+        public SSLContextParameters getSslContextParameters() {
+            return sslContextParameters;
+        }
+
+        public void setSslContextParameters(
+                SSLContextParameters sslContextParameters) {
+            this.sslContextParameters = sslContextParameters;
+        }
+
+        public String getSslKeyPassword() {
+            return sslKeyPassword;
+        }
+
+        public void setSslKeyPassword(String sslKeyPassword) {
+            this.sslKeyPassword = sslKeyPassword;
+        }
+
+        public String getSslKeystoreLocation() {
+            return sslKeystoreLocation;
+        }
+
+        public void setSslKeystoreLocation(String sslKeystoreLocation) {
+            this.sslKeystoreLocation = sslKeystoreLocation;
+        }
+
+        public String getSslKeystorePassword() {
+            return sslKeystorePassword;
+        }
+
+        public void setSslKeystorePassword(String sslKeystorePassword) {
+            this.sslKeystorePassword = sslKeystorePassword;
+        }
+
+        public String getSslTruststoreLocation() {
+            return sslTruststoreLocation;
+        }
+
+        public void setSslTruststoreLocation(String sslTruststoreLocation) {
+            this.sslTruststoreLocation = sslTruststoreLocation;
+        }
+
+        public String getSslTruststorePassword() {
+            return sslTruststorePassword;
+        }
+
+        public void setSslTruststorePassword(String sslTruststorePassword) {
+            this.sslTruststorePassword = sslTruststorePassword;
+        }
+
+        public Integer getBufferMemorySize() {
+            return bufferMemorySize;
+        }
+
+        public void setBufferMemorySize(Integer bufferMemorySize) {
+            this.bufferMemorySize = bufferMemorySize;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public void setKey(String key) {
+            this.key = key;
+        }
+
+        public Integer getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionKey(Integer partitionKey) {
+            this.partitionKey = partitionKey;
+        }
+
+        public String getRequestRequiredAcks() {
+            return requestRequiredAcks;
+        }
+
+        public void setRequestRequiredAcks(String requestRequiredAcks) {
+            this.requestRequiredAcks = requestRequiredAcks;
+        }
+
+        public Integer getRetries() {
+            return retries;
+        }
+
+        public void setRetries(Integer retries) {
+            this.retries = retries;
+        }
+
+        public Integer getProducerBatchSize() {
+            return producerBatchSize;
+        }
+
+        public void setProducerBatchSize(Integer producerBatchSize) {
+            this.producerBatchSize = producerBatchSize;
+        }
+
+        public Integer getConnectionMaxIdleMs() {
+            return connectionMaxIdleMs;
+        }
+
+        public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+            this.connectionMaxIdleMs = connectionMaxIdleMs;
+        }
+
+        public Integer getLingerMs() {
+            return lingerMs;
+        }
+
+        public void setLingerMs(Integer lingerMs) {
+            this.lingerMs = lingerMs;
+        }
+
+        public Integer getMaxBlockMs() {
+            return maxBlockMs;
+        }
+
+        public void setMaxBlockMs(Integer maxBlockMs) {
+            this.maxBlockMs = maxBlockMs;
+        }
+
+        public Integer getMaxRequestSize() {
+            return maxRequestSize;
+        }
+
+        public void setMaxRequestSize(Integer maxRequestSize) {
+            this.maxRequestSize = maxRequestSize;
+        }
+
+        public Integer getReceiveBufferBytes() {
+            return receiveBufferBytes;
+        }
+
+        public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+            this.receiveBufferBytes = receiveBufferBytes;
+        }
+
+        public Integer getMaxInFlightRequest() {
+            return maxInFlightRequest;
+        }
+
+        public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+            this.maxInFlightRequest = maxInFlightRequest;
+        }
+
+        public Integer getMetadataMaxAgeMs() {
+            return metadataMaxAgeMs;
+        }
+
+        public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+            this.metadataMaxAgeMs = metadataMaxAgeMs;
+        }
+
+        public String getMetricReporters() {
+            return metricReporters;
+        }
+
+        public void setMetricReporters(String metricReporters) {
+            this.metricReporters = metricReporters;
+        }
+
+        public Integer getNoOfMetricsSample() {
+            return noOfMetricsSample;
+        }
+
+        public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+            this.noOfMetricsSample = noOfMetricsSample;
+        }
+
+        public Integer getMetricsSampleWindowMs() {
+            return metricsSampleWindowMs;
+        }
+
+        public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+            this.metricsSampleWindowMs = metricsSampleWindowMs;
+        }
+
+        public Integer getReconnectBackoffMs() {
+            return reconnectBackoffMs;
+        }
+
+        public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+            this.reconnectBackoffMs = reconnectBackoffMs;
+        }
+
+        public Integer getHeartbeatIntervalMs() {
+            return heartbeatIntervalMs;
+        }
+
+        public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+        }
+
+        public Integer getMaxPartitionFetchBytes() {
+            return maxPartitionFetchBytes;
+        }
+
+        public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+            this.maxPartitionFetchBytes = maxPartitionFetchBytes;
+        }
+
+        public Integer getSessionTimeoutMs() {
+            return sessionTimeoutMs;
+        }
+
+        public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+        }
+
+        public Integer getMaxPollRecords() {
+            return maxPollRecords;
+        }
+
+        public void setMaxPollRecords(Integer maxPollRecords) {
+            this.maxPollRecords = maxPollRecords;
+        }
+
+        public Long getPollTimeoutMs() {
+            return pollTimeoutMs;
+        }
+
+        public void setPollTimeoutMs(Long pollTimeoutMs) {
+            this.pollTimeoutMs = pollTimeoutMs;
+        }
+
+        public String getPartitionAssignor() {
+            return partitionAssignor;
+        }
+
+        public void setPartitionAssignor(String partitionAssignor) {
+            this.partitionAssignor = partitionAssignor;
+        }
+
+        public Integer getConsumerRequestTimeoutMs() {
+            return consumerRequestTimeoutMs;
+        }
+
+        public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+            this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
+        }
+
+        public Boolean getCheckCrcs() {
+            return checkCrcs;
+        }
+
+        public void setCheckCrcs(Boolean checkCrcs) {
+            this.checkCrcs = checkCrcs;
+        }
+
+        public String getKeyDeserializer() {
+            return keyDeserializer;
+        }
+
+        public void setKeyDeserializer(String keyDeserializer) {
+            this.keyDeserializer = keyDeserializer;
+        }
+
+        public String getValueDeserializer() {
+            return valueDeserializer;
+        }
+
+        public void setValueDeserializer(String valueDeserializer) {
+            this.valueDeserializer = valueDeserializer;
+        }
+
+        public String getSeekTo() {
+            return seekTo;
+        }
+
+        public void setSeekTo(String seekTo) {
+            this.seekTo = seekTo;
+        }
+
+        public ExecutorService getWorkerPool() {
+            return workerPool;
+        }
+
+        public void setWorkerPool(ExecutorService workerPool) {
+            this.workerPool = workerPool;
+        }
+
+        public Integer getWorkerPoolCoreSize() {
+            return workerPoolCoreSize;
+        }
+
+        public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+            this.workerPoolCoreSize = workerPoolCoreSize;
+        }
+
+        public Integer getWorkerPoolMaxSize() {
+            return workerPoolMaxSize;
+        }
+
+        public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+            this.workerPoolMaxSize = workerPoolMaxSize;
+        }
+
+        public Boolean getRecordMetadata() {
+            return recordMetadata;
+        }
+
+        public void setRecordMetadata(Boolean recordMetadata) {
+            this.recordMetadata = recordMetadata;
+        }
+
+        public String getInterceptorClasses() {
+            return interceptorClasses;
+        }
+
+        public void setInterceptorClasses(String interceptorClasses) {
+            this.interceptorClasses = interceptorClasses;
+        }
+    }
 }
\ No newline at end of file