You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/09 10:10:56 UTC

[3/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.

CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2aa831d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2aa831d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2aa831d

Branch: refs/heads/master
Commit: b2aa831da8c8f78f7d6ca908c5b33957bbc7fa24
Parents: 038e161
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 09:58:36 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:01:59 2016 +0100

----------------------------------------------------------------------
 components/camel-kafka/pom.xml                  |   26 +-
 .../camel/component/kafka/KafkaComponent.java   |   22 +-
 .../component/kafka/KafkaConfiguration.java     | 1204 ++++++++++++------
 .../camel/component/kafka/KafkaConstants.java   |   13 +-
 .../camel/component/kafka/KafkaConsumer.java    |  183 +--
 .../camel/component/kafka/KafkaEndpoint.java    |  598 +++++----
 .../camel/component/kafka/KafkaProducer.java    |   64 +-
 .../component/kafka/BaseEmbeddedKafkaTest.java  |   21 +-
 .../component/kafka/KafkaComponentTest.java     |  224 ++--
 .../kafka/KafkaConsumerBatchSizeTest.java       |   64 +-
 .../component/kafka/KafkaConsumerFullTest.java  |   34 +-
 .../component/kafka/KafkaConsumerTest.java      |    8 +-
 .../component/kafka/KafkaEndpointTest.java      |   13 +-
 .../component/kafka/KafkaProducerFullTest.java  |  145 +--
 .../component/kafka/KafkaProducerTest.java      |   65 +-
 .../component/kafka/SimplePartitioner.java      |   39 -
 .../kafka/embedded/EmbeddedKafkaCluster.java    |   12 +-
 .../src/test/resources/log4j.properties         |    2 +-
 18 files changed, 1618 insertions(+), 1119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 1a5f98f..f8bbdb5 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -15,7 +15,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
@@ -37,31 +38,35 @@
   </properties>
 
   <dependencies>
+
+    <!-- camel -->
     <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
     </dependency>
+
+    <!-- kafka java client -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka-version}</version>
+    </dependency>
+
+    <!-- kafka server for testing using scala -->
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>${kafka-version}</version>
+      <scope>test</scope>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-simple</artifactId>
         </exclusion>
-          <exclusion>
-              <artifactId>scala-library</artifactId>
-              <groupId>org.scala-lang</groupId>
-          </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
 
+    <!-- test -->
     <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-test</artifactId>
@@ -77,6 +82,7 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c9d4c2a..2981b3f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -20,8 +20,6 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.UriEndpointComponent;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.EndpointHelper;
 
 public class KafkaComponent extends UriEndpointComponent {
 
@@ -34,30 +32,14 @@ public class KafkaComponent extends UriEndpointComponent {
     }
 
     @Override
-    protected KafkaEndpoint createEndpoint(String uri,
-                                           String remaining,
-                                           Map<String, Object> params) throws Exception {
-
+    protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
         String brokers = remaining.split("\\?")[0];
-        Object confparam = params.get("configuration");
-        if (confparam != null) {
-            // need a special handling to resolve the reference before other parameters are set/merged into the config
-            KafkaConfiguration confobj = null;
-            if (confparam instanceof KafkaConfiguration) {
-                confobj = (KafkaConfiguration)confparam;
-            } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) { 
-                confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1));
-            }
-            if (confobj != null) {
-                endpoint.setConfiguration(confobj.copy());
-            }
-            params.remove("configuration");
-        }
         if (brokers != null) {
             endpoint.getConfiguration().setBrokers(brokers);
         }
         setProperties(endpoint, params);
         return endpoint;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 894df0c..4a948c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -18,28 +18,28 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.producer.DefaultPartitioner;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 
 @UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration {
+
+    @UriPath @Metadata(required = "true")
+    private String brokers;
 
-    @UriParam
-    private String zookeeperConnect;
-    @UriParam
-    private String zookeeperHost;
-    @UriParam(defaultValue = "2181")
-    private int zookeeperPort = 2181;
     @UriParam @Metadata(required = "true")
     private String topic;
     @UriParam
     private String groupId;
-    @UriParam(defaultValue = "DefaultPartitioner")
-    private String partitioner = DefaultPartitioner.class.getCanonicalName();
+    @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
+    private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
     @UriParam(label = "consumer", defaultValue = "10")
     private int consumerStreams = 10;
     @UriParam(label = "consumer", defaultValue = "1")
@@ -53,134 +53,302 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam
     private String clientId;
 
+    //key.deserializer
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+    private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+    //value.deserializer
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+    private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+    //fetch.min.bytes
+    @UriParam(label = "consumer", defaultValue = "1024")
+    private Integer fetchMinBytes = 1024;
+    //heartbeat.interval.ms
+    @UriParam(label = "consumer", defaultValue = "3000")
+    private Integer heartbeatIntervalMs = 3000;
+    //max.partition.fetch.bytes
+    @UriParam(label = "consumer", defaultValue = "1048576")
+    private Integer maxPartitionFetchBytes = 1048576;
+    //session.timeout.ms
+    @UriParam(label = "consumer", defaultValue = "30000")
+    private Integer sessionTimeoutMs = 30000;
+    //auto.offset.reset
+    @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
+    private String autoOffsetReset = "latest";
+    //partition.assignment.strategy
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR)
+    private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
+    //request.timeout.ms
+    @UriParam(label = "consumer", defaultValue = "40000")
+    private Integer consumerRequestTimeoutMs = 40000;
+    //auto.commit.interval.ms
+    @UriParam(label = "consumer", defaultValue = "5000")
+    private Integer autoCommitIntervalMs = 5000;
+    //check.crcs
+    @UriParam(label = "consumer", defaultValue = "true")
+    private Boolean checkCrcs = true;
+    //fetch.max.wait.ms
+    @UriParam(label = "consumer", defaultValue = "500")
+    private Integer fetchWaitMaxMs = 500;
+
     //Consumer configuration properties
     @UriParam(label = "consumer")
     private String consumerId;
-    @UriParam(label = "consumer", defaultValue = "30000")
-    private Integer socketTimeoutMs = 30 * 1000;
-    @UriParam(label = "consumer", defaultValue = "" + 64 * 1024)
-    private Integer socketReceiveBufferBytes = 64 * 1024;
-    @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024)
-    private Integer fetchMessageMaxBytes = 1024 * 1024;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
-    @UriParam(label = "consumer", defaultValue = "60000")
-    private Integer autoCommitIntervalMs = 60 * 1000;
-    @UriParam(label = "consumer", defaultValue = "2")
-    private Integer queuedMaxMessageChunks = 2;
-    @UriParam(label = "consumer", defaultValue = "4")
-    private Integer rebalanceMaxRetries = 4;
-    @UriParam(label = "consumer", defaultValue = "1")
-    private Integer fetchMinBytes = 1;
-    @UriParam(label = "consumer", defaultValue = "100")
-    private Integer fetchWaitMaxMs = 100;
-    @UriParam(label = "consumer", defaultValue = "2000")
-    private Integer rebalanceBackoffMs = 2000;
-    @UriParam(label = "consumer", defaultValue = "200")
-    private Integer refreshLeaderBackoffMs = 200;
-    @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail")
-    private String autoOffsetReset = "largest";
-    @UriParam(label = "consumer")
-    private Integer consumerTimeoutMs;
-    @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
-    private String offsetsStorage = "zookeeper";
-    @UriParam(label = "consumer", defaultValue = "true")
-    private Boolean dualCommitEnabled = true;
-
-    //Zookeepr configuration properties
-    @UriParam
-    private Integer zookeeperSessionTimeoutMs;
-    @UriParam
-    private Integer zookeeperConnectionTimeoutMs;
-    @UriParam
-    private Integer zookeeperSyncTimeMs;
 
     //Producer configuration properties
-    @UriPath
-    private String brokers;
-    @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
-    private String producerType = "sync";
-    @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy")
-    private String compressionCodec = "none";
-    @UriParam(label = "producer")
-    private String compressedTopics;
-    @UriParam(label = "producer", defaultValue = "3")
-    private Integer messageSendMaxRetries = 3;
     @UriParam(label = "producer", defaultValue = "100")
     private Integer retryBackoffMs = 100;
-    @UriParam(label = "producer", defaultValue = "600000")
-    private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
-
-    //Sync producer config
-    @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
-    private Integer sendBufferBytes = 100 * 1024;
-    @UriParam(label = "producer", defaultValue = "0")
-    private short requestRequiredAcks;
-    @UriParam(label = "producer", defaultValue = "10000")
-    private Integer requestTimeoutMs = 10000;
 
     //Async producer config
-    @UriParam(label = "producer", defaultValue = "5000")
-    private Integer queueBufferingMaxMs = 5000;
     @UriParam(label = "producer", defaultValue = "10000")
     private Integer queueBufferingMaxMessages = 10000;
     @UriParam(label = "producer")
-    private Integer queueEnqueueTimeoutMs;
-    @UriParam(label = "producer", defaultValue = "200")
-    private Integer batchNumMessages = 200;
-    @UriParam(label = "producer")
     private String serializerClass;
     @UriParam(label = "producer")
     private String keySerializerClass;
 
+    @UriParam(label = "producer", defaultValue = "1")
+    private Integer requestRequiredAcks = 1;
+    //buffer.memory
+    @UriParam(label = "producer", defaultValue = "33554432")
+    private Integer bufferMemorySize = 33554432;
+    //compression.type
+    @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4")
+    private String compressionCodec = "none";
+    //retries
+    @UriParam(label = "producer", defaultValue = "0")
+    private Integer retries = 0;
+    // SSL
+    // ssl.key.password
+    @UriParam(label = "producer")
+    private String sslKeyPassword;
+    // ssl.keystore.location
+    @UriParam(label = "producer")
+    private String sslKeystoreLocation;
+    // ssl.keystore.password
+    @UriParam(label = "producer")
+    private String sslKeystorePassword;
+    //ssl.truststore.location
+    @UriParam(label = "producer")
+    private String sslTruststoreLocation;
+    //ssl.truststore.password
+    @UriParam(label = "producer")
+    private String sslTruststorePassword;
+    //batch.size
+    @UriParam(label = "producer", defaultValue = "16384")
+    private Integer producerBatchSize = 16384;
+    //connections.max.idle.ms
+    @UriParam(label = "producer", defaultValue = "540000")
+    private Integer connectionMaxIdleMs = 540000;
+    //linger.ms
+    @UriParam(label = "producer", defaultValue = "0")
+    private Integer lingerMs = 0;
+    //linger.ms
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer maxBlockMs = 60000;
+    //max.request.size
+    @UriParam(label = "producer", defaultValue = "1048576")
+    private Integer maxRequestSize = 1048576;
+    //receive.buffer.bytes
+    @UriParam(label = "producer", defaultValue = "32768")
+    private Integer receiveBufferBytes = 32768;
+    //request.timeout.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer requestTimeoutMs = 30000;
+    // SASL & sucurity Protocol
+    //sasl.kerberos.service.name
+    @UriParam(label = "producer")
+    private String saslKerberosServiceName;
+    //security.protocol
+    @UriParam(label = "producer", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL)
+    private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+    //send.buffer.bytes
+    @UriParam(label = "producer", defaultValue = "131072")
+    private Integer sendBufferBytes = 131072;
+    //SSL
+    //ssl.enabled.protocols
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)
+    private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+    //ssl.keystore.type
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE)
+    private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+    //ssl.protocol
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_PROTOCOL)
+    private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
+    //ssl.provider
+    @UriParam(label = "producer")
+    private String sslProvider;
+    //ssl.truststore.type
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
+    private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+    //timeout.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer timeoutMs = 30000;
+    //block.on.buffer.full
+    @UriParam(label = "producer", defaultValue = "false")
+    private Boolean blockOnBufferFull = false;
+    //max.in.flight.requests.per.connection
+    @UriParam(label = "producer", defaultValue = "5")
+    private Integer maxInFlightRequest = 5;
+    //metadata.fetch.timeout.ms
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer metadataFetchTimeoutMs = 600 * 1000;
+    //metadata.max.age.ms
+    @UriParam(label = "producer", defaultValue = "300000")
+    private Integer metadataMaxAgeMs = 300000;
+    //metric.reporters
+    @UriParam(label = "producer")
+    private String metricReporters;
+    //metrics.num.samples
+    @UriParam(label = "producer", defaultValue = "2")
+    private Integer noOfMetricsSample = 2;
+    //metrics.sample.window.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer metricsSampleWindowMs = 30000;
+    //reconnect.backoff.ms
+    @UriParam(label = "producer", defaultValue = "50")
+    private Integer reconnectBackoffMs = 50;
+    //SASL
+    //sasl.kerberos.kinit.cmd
+    @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
+    private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
+    //sasl.kerberos.min.time.before.relogin
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer kerberosBeforeReloginMinTime = 60000;
+    //sasl.kerberos.ticket.renew.jitter
+    @UriParam(label = "producer", defaultValue = "0.05")
+    private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
+    //sasl.kerberos.ticket.renew.window.factor
+    @UriParam(label = "producer", defaultValue = "0.8")
+    private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
+    //SSL
+    //ssl.cipher.suites
+    @UriParam(label = "producer")
+    private String sslCipherSuites;
+    //ssl.endpoint.identification.algorithm
+    @UriParam(label = "producer")
+    private String sslEndpointAlgorithm;
+    //ssl.keymanager.algorithm
+    @UriParam(label = "producer", defaultValue = "SunX509")
+    private String sslKeymanagerAlgorithm = "SunX509";
+    //ssl.trustmanager.algorithm
+    @UriParam(label = "producer", defaultValue = "PKIX")
+    private String sslTrustmanagerAlgorithm = "PKIX";
+
     public KafkaConfiguration() {
     }
 
     public Properties createProducerProperties() {
         Properties props = new Properties();
-        addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
-        addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
-        addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
-        addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
-        addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
-        addPropertyIfNotNull(props, "producer.type", getProducerType());
-        addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
-        addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
-        addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
-        addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
-        addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
-        addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
-        addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
-        addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
-        addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
-        addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
-        addPropertyIfNotNull(props, "client.id", getClientId());
+        addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass());
+        addPropertyIfNotNull(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializerClass());
+        addPropertyIfNotNull(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks());
+        addPropertyIfNotNull(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize());
+        addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
+        addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
+        // SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries());
+        addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
+        addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
+        addPropertyIfNotNull(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+        addPropertyIfNotNull(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize());
+        addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
+        addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+        addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+        // Security protocol
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
+        addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
+        addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
+        addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+        addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+        addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+        addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+        addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+        addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+
         return props;
     }
 
     public Properties createConsumerProperties() {
         Properties props = new Properties();
-        addPropertyIfNotNull(props, "consumer.id", getConsumerId());
-        addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
-        addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
-        addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
-        addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
-        addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
-        addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
-        addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
-        addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
-        addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks());
-        addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
-        addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
-        addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
-        addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
-        addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
-        addPropertyIfNotNull(props, "client.id", getClientId());
-        addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
-        addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
-        addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
-        addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
-        addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
+        addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
+        addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
+        addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+        addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        // SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
+        addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+        addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
+        addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+        addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+        // Security protocol
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
+        addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
+        addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
+        addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs());
+        addPropertyIfNotNull(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+        addPropertyIfNotNull(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+        addPropertyIfNotNull(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+        addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+        addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+        addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         return props;
     }
 
@@ -191,65 +359,6 @@ public class KafkaConfiguration implements Cloneable {
         }
     }
 
-    public String getZookeeperConnect() {
-        if (this.zookeeperConnect != null) {
-            return zookeeperConnect;
-        } else {
-            return getZookeeperHost() + ":" + getZookeeperPort();
-        }
-    }
-
-    /**
-     * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.
-     * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the
-     * form hostname1:port1,hostname2:port2,hostname3:port3.
-     * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data
-     * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string.
-     * For example to give a chroot path of /chroot/path you would give the connection
-     * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
-     */
-    public void setZookeeperConnect(String zookeeperConnect) {
-        this.zookeeperConnect = zookeeperConnect;
-
-        // connect overrides host and port
-        this.zookeeperHost = null;
-        this.zookeeperPort = -1;
-    }
-
-    public String getZookeeperHost() {
-        return zookeeperHost;
-    }
-
-    /**
-     * The zookeeper host to use.
-     * <p/>
-     * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
-     * <p/>
-     * This option can only be used if zookeeperConnect is not in use.
-     */
-    public void setZookeeperHost(String zookeeperHost) {
-        if (this.zookeeperConnect == null) {
-            this.zookeeperHost = zookeeperHost;
-        }
-    }
-
-    public int getZookeeperPort() {
-        return zookeeperPort;
-    }
-
-    /**
-     * The zookeeper port to use
-     * <p/>
-     * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
-     * <p/>
-     * This option can only be used if zookeeperConnect is not in use.
-     */
-    public void setZookeeperPort(int zookeeperPort) {
-        if (this.zookeeperConnect == null) {
-            this.zookeeperPort = zookeeperPort;
-        }
-    }
-
     public String getGroupId() {
         return groupId;
     }
@@ -278,7 +387,7 @@ public class KafkaConfiguration implements Cloneable {
     }
 
     /**
-     * Name of the topic to use
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -351,440 +460,741 @@ public class KafkaConfiguration implements Cloneable {
         this.consumerId = consumerId;
     }
 
-    public Integer getSocketTimeoutMs() {
-        return socketTimeoutMs;
+    public Boolean isAutoCommitEnable() {
+        return autoCommitEnable;
     }
 
     /**
-     * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms.
+     * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
+     * This committed offset will be used when the process fails as the position from which the new consumer will begin.
      */
-    public void setSocketTimeoutMs(Integer socketTimeoutMs) {
-        this.socketTimeoutMs = socketTimeoutMs;
+    public void setAutoCommitEnable(Boolean autoCommitEnable) {
+        this.autoCommitEnable = autoCommitEnable;
     }
 
-    public Integer getSocketReceiveBufferBytes() {
-        return socketReceiveBufferBytes;
+    public Integer getAutoCommitIntervalMs() {
+        return autoCommitIntervalMs;
     }
 
     /**
-     * The socket receive buffer for network requests
+     * The frequency in ms that the consumer offsets are committed to zookeeper.
      */
-    public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
-        this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+        this.autoCommitIntervalMs = autoCommitIntervalMs;
     }
 
-    public Integer getFetchMessageMaxBytes() {
-        return fetchMessageMaxBytes;
+    public Integer getFetchMinBytes() {
+        return fetchMinBytes;
     }
 
     /**
-     * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request.
-     * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer.
-     * The fetch request size must be at least as large as the maximum message size the server allows or else it
-     * is possible for the producer to send messages larger than the consumer can fetch.
+     * The minimum amount of data the server should return for a fetch request.
+     * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
      */
-    public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
-        this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+    public void setFetchMinBytes(Integer fetchMinBytes) {
+        this.fetchMinBytes = fetchMinBytes;
     }
 
-    public Boolean isAutoCommitEnable() {
-        return autoCommitEnable;
+    public Integer getFetchWaitMaxMs() {
+        return fetchWaitMaxMs;
     }
 
     /**
-     * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
-     * This committed offset will be used when the process fails as the position from which the new consumer will begin.
+     * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
      */
-    public void setAutoCommitEnable(Boolean autoCommitEnable) {
-        this.autoCommitEnable = autoCommitEnable;
+    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+        this.fetchWaitMaxMs = fetchWaitMaxMs;
     }
 
-    public Integer getAutoCommitIntervalMs() {
-        return autoCommitIntervalMs;
+    public String getAutoOffsetReset() {
+        return autoOffsetReset;
     }
 
     /**
-     * The frequency in ms that the consumer offsets are committed to zookeeper.
+     * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
+     * smallest : automatically reset the offset to the smallest offset
+     * largest : automatically reset the offset to the largest offset
+     * fail: throw exception to the consumer
      */
-    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
-        this.autoCommitIntervalMs = autoCommitIntervalMs;
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        this.autoOffsetReset = autoOffsetReset;
     }
 
-    public Integer getQueuedMaxMessageChunks() {
-        return queuedMaxMessageChunks;
+    public String getBrokers() {
+        return brokers;
     }
 
     /**
-     * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
+     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
+     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
      */
-    public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
-        this.queuedMaxMessageChunks = queuedMaxMessageChunks;
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
     }
 
-    public Integer getRebalanceMaxRetries() {
-        return rebalanceMaxRetries;
+    public String getCompressionCodec() {
+        return compressionCodec;
     }
 
     /**
-     * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
-     * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry.
-     * This setting controls the maximum number of attempts before giving up.
+     * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
      */
-    public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
-        this.rebalanceMaxRetries = rebalanceMaxRetries;
+    public void setCompressionCodec(String compressionCodec) {
+        this.compressionCodec = compressionCodec;
     }
 
-    public Integer getFetchMinBytes() {
-        return fetchMinBytes;
+    public Integer getRetryBackoffMs() {
+        return retryBackoffMs;
     }
 
     /**
-     * The minimum amount of data the server should return for a fetch request.
-     * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+     * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
+     * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
      */
-    public void setFetchMinBytes(Integer fetchMinBytes) {
-        this.fetchMinBytes = fetchMinBytes;
+    public void setRetryBackoffMs(Integer retryBackoffMs) {
+        this.retryBackoffMs = retryBackoffMs;
     }
 
-    public Integer getFetchWaitMaxMs() {
-        return fetchWaitMaxMs;
+    public Integer getSendBufferBytes() {
+        return sendBufferBytes;
     }
 
     /**
-     * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
+     * Socket write buffer size
      */
-    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
-        this.fetchWaitMaxMs = fetchWaitMaxMs;
+    public void setSendBufferBytes(Integer sendBufferBytes) {
+        this.sendBufferBytes = sendBufferBytes;
     }
 
-    public Integer getRebalanceBackoffMs() {
-        return rebalanceBackoffMs;
+    public Integer getRequestTimeoutMs() {
+        return requestTimeoutMs;
     }
 
     /**
-     * Backoff time between retries during rebalance.
+     * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
      */
-    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
-        this.rebalanceBackoffMs = rebalanceBackoffMs;
+    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+        this.requestTimeoutMs = requestTimeoutMs;
     }
 
-    public Integer getRefreshLeaderBackoffMs() {
-        return refreshLeaderBackoffMs;
+    public Integer getQueueBufferingMaxMessages() {
+        return queueBufferingMaxMessages;
     }
 
     /**
-     * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
+     * The maximum number of unsent messages that can be queued up the producer when using async
+     * mode before either the producer must be blocked or data must be dropped.
      */
-    public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
-        this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+        this.queueBufferingMaxMessages = queueBufferingMaxMessages;
     }
 
-    public String getAutoOffsetReset() {
-        return autoOffsetReset;
+    public String getSerializerClass() {
+        if (serializerClass == null) {
+            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        }
+        return serializerClass;
     }
 
     /**
-     * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
-     * smallest : automatically reset the offset to the smallest offset
-     * largest : automatically reset the offset to the largest offset
-     * fail: throw exception to the consumer
+     * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
+     * The default class is kafka.serializer.DefaultEncoder
      */
-    public void setAutoOffsetReset(String autoOffsetReset) {
-        this.autoOffsetReset = autoOffsetReset;
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
     }
 
-    public Integer getConsumerTimeoutMs() {
-        return consumerTimeoutMs;
+    public String getKeySerializerClass() {
+        if (keySerializerClass == null) {
+            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        }
+        return keySerializerClass;
     }
 
     /**
-     * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
+     * The serializer class for keys (defaults to the same as for messages if nothing is given).
      */
-    public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
-        this.consumerTimeoutMs = consumerTimeoutMs;
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
     }
 
-    public Integer getZookeeperSessionTimeoutMs() {
-        return zookeeperSessionTimeoutMs;
+    public String getKerberosInitCmd() {
+        return kerberosInitCmd;
     }
 
     /**
-     * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
+     * Kerberos kinit command path. Default is /usr/bin/kinit
      */
-    public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
-        this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+    public void setKerberosInitCmd(String kerberosInitCmd) {
+        this.kerberosInitCmd = kerberosInitCmd;
     }
 
-    public Integer getZookeeperConnectionTimeoutMs() {
-        return zookeeperConnectionTimeoutMs;
+    public Integer getKerberosBeforeReloginMinTime() {
+        return kerberosBeforeReloginMinTime;
     }
 
     /**
-     * The max time that the client waits while establishing a connection to zookeeper.
+     * Login thread sleep time between refresh attempts.
      */
-    public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
-        this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+    public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+        this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
     }
 
-    public Integer getZookeeperSyncTimeMs() {
-        return zookeeperSyncTimeMs;
+    public Double getKerberosRenewJitter() {
+        return kerberosRenewJitter;
     }
 
     /**
-     * How far a ZK follower can be behind a ZK leader
+     * Percentage of random jitter added to the renewal time.
      */
-    public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
-        this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+    public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+        this.kerberosRenewJitter = kerberosRenewJitter;
     }
 
-    public String getBrokers() {
-        return brokers;
+    public Double getKerberosRenewWindowFactor() {
+        return kerberosRenewWindowFactor;
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
-     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
-     * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     * Login thread will sleep until the specified window factor of time from last
+     * refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.
      */
-    public void setBrokers(String brokers) {
-        this.brokers = brokers;
+    public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+        this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
     }
 
-    public String getProducerType() {
-        return producerType;
+    public String getSslCipherSuites() {
+        return sslCipherSuites;
     }
 
     /**
-     * This parameter specifies whether the messages are sent asynchronously in a background thread.
-     * Valid values are (1) async for asynchronous send and (2) sync for synchronous send.
-     * By setting the producer to async we allow batching together of requests (which is great for throughput)
-     * but open the possibility of a failure of the client machine dropping unsent data.
+     * A list of cipher suites. This is a named combination of authentication, encryption,
+     * MAC and key exchange algorithm used to negotiate the security settings for a network connection
+     * using TLS or SSL network protocol.By default all the available cipher suites are supported.
      */
-    public void setProducerType(String producerType) {
-        this.producerType = producerType;
+    public void setSslCipherSuites(String sslCipherSuites) {
+        this.sslCipherSuites = sslCipherSuites;
     }
 
-    public String getCompressionCodec() {
-        return compressionCodec;
+    public String getSslEndpointAlgorithm() {
+        return sslEndpointAlgorithm;
     }
 
     /**
-     * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
+     * The endpoint identification algorithm to validate server hostname using server certificate.
      */
-    public void setCompressionCodec(String compressionCodec) {
-        this.compressionCodec = compressionCodec;
+    public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+        this.sslEndpointAlgorithm = sslEndpointAlgorithm;
     }
 
-    public String getCompressedTopics() {
-        return compressedTopics;
+    public String getSslKeymanagerAlgorithm() {
+        return sslKeymanagerAlgorithm;
     }
 
     /**
-     * This parameter allows you to set whether compression should be turned on for particular topics.
-     * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any.
-     * If the list of compressed topics is empty, then enable the specified compression codec for all topics.
-     * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+     * The algorithm used by key manager factory for SSL connections. Default value is the key
+     * manager factory algorithm configured for the Java Virtual Machine.
      */
-    public void setCompressedTopics(String compressedTopics) {
-        this.compressedTopics = compressedTopics;
+    public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+        this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
     }
 
-    public Integer getMessageSendMaxRetries() {
-        return messageSendMaxRetries;
+    public String getSslTrustmanagerAlgorithm() {
+        return sslTrustmanagerAlgorithm;
     }
 
     /**
-     * This property will cause the producer to automatically retry a failed send request.
-     * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here
-     * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
+     * The algorithm used by trust manager factory for SSL connections. Default value is the
+     * trust manager factory algorithm configured for the Java Virtual Machine.
      */
-    public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
-        this.messageSendMaxRetries = messageSendMaxRetries;
+    public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+        this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
     }
 
-    public Integer getRetryBackoffMs() {
-        return retryBackoffMs;
+    public String getSslEnabledProtocols() {
+        return sslEnabledProtocols;
     }
 
     /**
-     * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
-     * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
+     * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.
      */
-    public void setRetryBackoffMs(Integer retryBackoffMs) {
-        this.retryBackoffMs = retryBackoffMs;
+    public void setSslEnabledProtocols(String sslEnabledProtocols) {
+        this.sslEnabledProtocols = sslEnabledProtocols;
     }
 
-    public Integer getTopicMetadataRefreshIntervalMs() {
-        return topicMetadataRefreshIntervalMs;
+    public String getSslKeystoreType() {
+        return sslKeystoreType;
     }
 
     /**
-     * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing,
-     * leader not available...). It will also poll regularly (default: every 10min so 600000ms).
-     * If you set this to a negative value, metadata will only get refreshed on failure.
-     * If you set this to zero, the metadata will get refreshed after each message sent (not recommended).
-     * Important note: the refresh happen only AFTER the message is sent, so if the producer never
-     * sends a message the metadata is never refreshed
+     * The file format of the key store file. This is optional for client. Default value is JKS
      */
-    public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
-        this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+    public void setSslKeystoreType(String sslKeystoreType) {
+        this.sslKeystoreType = sslKeystoreType;
     }
 
-    public Integer getSendBufferBytes() {
-        return sendBufferBytes;
+    public String getSslProtocol() {
+        return sslProtocol;
     }
 
     /**
-     * Socket write buffer size
+     * The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases.
+     * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs,
+     * but their usage is discouraged due to known security vulnerabilities.
      */
-    public void setSendBufferBytes(Integer sendBufferBytes) {
-        this.sendBufferBytes = sendBufferBytes;
+    public void setSslProtocol(String sslProtocol) {
+        this.sslProtocol = sslProtocol;
+    }
+
+    public String getSslProvider() {
+        return sslProvider;
+    }
+
+    /**
+     * The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
+     */
+    public void setSslProvider(String sslProvider) {
+        this.sslProvider = sslProvider;
+    }
+
+    public String getSslTruststoreType() {
+        return sslTruststoreType;
+    }
+
+    /**
+     * The file format of the trust store file. Default value is JKS.
+     */
+    public void setSslTruststoreType(String sslTruststoreType) {
+        this.sslTruststoreType = sslTruststoreType;
+    }
+
+    public String getSaslKerberosServiceName() {
+        return saslKerberosServiceName;
+    }
+
+    /**
+     * The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS
+     * config or in Kafka's config.
+     */
+    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+        this.saslKerberosServiceName = saslKerberosServiceName;
+    }
+
+    public String getSecurityProtocol() {
+        return securityProtocol;
+    }
+
+    /**
+     * Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
+     */
+    public void setSecurityProtocol(String securityProtocol) {
+        this.securityProtocol = securityProtocol;
+    }
+
+    public String getSslKeyPassword() {
+        return sslKeyPassword;
+    }
+
+    /**
+     * The password of the private key in the key store file. This is optional for client.
+     */
+    public void setSslKeyPassword(String sslKeyPassword) {
+        this.sslKeyPassword = sslKeyPassword;
+    }
+
+    public String getSslKeystoreLocation() {
+        return sslKeystoreLocation;
     }
 
-    public short getRequestRequiredAcks() {
+    /**
+     * The location of the key store file. This is optional for client and can be used for two-way
+     * authentication for client.
+     */
+    public void setSslKeystoreLocation(String sslKeystoreLocation) {
+        this.sslKeystoreLocation = sslKeystoreLocation;
+    }
+
+    public String getSslKeystorePassword() {
+        return sslKeystorePassword;
+    }
+
+    /**
+     * The store password for the key store file.This is optional for client and only needed
+     * if ssl.keystore.location is configured.
+     */
+    public void setSslKeystorePassword(String sslKeystorePassword) {
+        this.sslKeystorePassword = sslKeystorePassword;
+    }
+
+    public String getSslTruststoreLocation() {
+        return sslTruststoreLocation;
+    }
+
+    /**
+     * The location of the trust store file.
+     */
+    public void setSslTruststoreLocation(String sslTruststoreLocation) {
+        this.sslTruststoreLocation = sslTruststoreLocation;
+    }
+
+    public String getSslTruststorePassword() {
+        return sslTruststorePassword;
+    }
+
+
+    /**
+     * The password for the trust store file.
+     */
+    public void setSslTruststorePassword(String sslTruststorePassword) {
+        this.sslTruststorePassword = sslTruststorePassword;
+    }
+
+    public Integer getBufferMemorySize() {
+        return bufferMemorySize;
+    }
+
+    /**
+     * The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
+     * If records are sent faster than they can be delivered to the server the producer will either block
+     * or throw an exception based on the preference specified by block.on.buffer.full.This setting should
+     * correspond roughly to the total memory the producer will use, but is not a hard bound since not all
+     * memory the producer uses is used for buffering. Some additional memory will be used for compression
+     * (if compression is enabled) as well as for maintaining in-flight requests.
+     */
+    public void setBufferMemorySize(Integer bufferMemorySize) {
+        this.bufferMemorySize = bufferMemorySize;
+    }
+
+    public Boolean getBlockOnBufferFull() {
+        return blockOnBufferFull;
+    }
+
+    /**
+     * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors.
+     * By default this setting is true and we block, however in some scenarios blocking is not desirable and it
+     * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw
+     * a BufferExhaustedException if a recrord is sent and the buffer space is full.
+     */
+    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+        this.blockOnBufferFull = blockOnBufferFull;
+    }
+
+    public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
 
     /**
-     * This value controls when a produce request is considered completed. Specifically,
-     * how many other brokers must have committed the data to their log and acknowledged this to the leader?
-     * Typical values are (0, 1 or -1):
-     * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7).
-     * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
-     * 1, which means that the producer gets an acknowledgement after the leader replica has received the data.
-     * This option provides better durability as the client waits until the server acknowledges the request as successful
-     * (only messages that were written to the now-dead leader but not yet replicated will be lost).
-     * -1, The producer gets an acknowledgement after all in-sync replicas have received the data.
-     * This option provides the greatest level of durability.
-     * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may,
-     * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas
-     * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting.
-     * Please read the Replication section of the design documentation for a more in-depth discussion.
+     * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+     * This controls the durability of records that are sent. The following settings are common:
+     * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all.
+     * The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server
+     * has received the record in this case, and the retries configuration will not take effect (as the client won't generally
+     * know of any failures). The offset given back for each record will always be set to -1.
+     * acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement
+     * from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have
+     * replicated it then the record will be lost.
+     * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
+     * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
      */
-    public void setRequestRequiredAcks(short requestRequiredAcks) {
+    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
 
-    public Integer getRequestTimeoutMs() {
-        return requestTimeoutMs;
+    public Integer getRetries() {
+        return retries;
     }
 
     /**
-     * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
+     * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
+     * Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially
+     * change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second
+     * succeeds, then the second record may appear first.
      */
-    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
-        this.requestTimeoutMs = requestTimeoutMs;
+    public void setRetries(Integer retries) {
+        this.retries = retries;
     }
 
-    public Integer getQueueBufferingMaxMs() {
-        return queueBufferingMaxMs;
+    public Integer getProducerBatchSize() {
+        return producerBatchSize;
     }
 
     /**
-     * Maximum time to buffer data when using async mode.
-     * For example a setting of 100 will try to batch together 100ms of messages to send at once.
-     * This will improve throughput but adds message delivery latency due to the buffering.
+     * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
+     * This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
+     * No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each
+     * partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero
+     * will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the
+     * specified batch size in anticipation of additional records.
      */
-    public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
-        this.queueBufferingMaxMs = queueBufferingMaxMs;
+    public void setProducerBatchSize(Integer producerBatchSize) {
+        this.producerBatchSize = producerBatchSize;
     }
 
-    public Integer getQueueBufferingMaxMessages() {
-        return queueBufferingMaxMessages;
+    public Integer getConnectionMaxIdleMs() {
+        return connectionMaxIdleMs;
     }
 
     /**
-     * The maximum number of unsent messages that can be queued up the producer when using async
-     * mode before either the producer must be blocked or data must be dropped.
+     * Close idle connections after the number of milliseconds specified by this config.
      */
-    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
-        this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+    public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+        this.connectionMaxIdleMs = connectionMaxIdleMs;
     }
 
-    public Integer getQueueEnqueueTimeoutMs() {
-        return queueEnqueueTimeoutMs;
+    public Integer getLingerMs() {
+        return lingerMs;
     }
 
     /**
-     * The amount of time to block before dropping messages when running in async mode and the buffer has reached
-     * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full
-     * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.
+     * The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this
+     * occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce
+     * the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is,
+     * rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that
+     * the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on
+     * the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting,
+     * however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more
+     * records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the
+     * number of requests sent but would add up to 5ms of latency to records sent in the absense of load.
      */
-    public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
-        this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+    public void setLingerMs(Integer lingerMs) {
+        this.lingerMs = lingerMs;
     }
 
-    public Integer getBatchNumMessages() {
-        return batchNumMessages;
+    public Integer getMaxBlockMs() {
+        return maxBlockMs;
     }
 
     /**
-     * The number of messages to send in one batch when using async mode.
-     * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
+     * The configuration controls how long sending to kafka will block. These methods can be
+     * blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent
+     * in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of
+     * partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata
      */
-    public void setBatchNumMessages(Integer batchNumMessages) {
-        this.batchNumMessages = batchNumMessages;
+    public void setMaxBlockMs(Integer maxBlockMs) {
+        this.maxBlockMs = maxBlockMs;
     }
 
-    public String getSerializerClass() {
-        return serializerClass;
+    public Integer getMaxRequestSize() {
+        return maxRequestSize;
     }
 
     /**
-     * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
-     * The default class is kafka.serializer.DefaultEncoder
+     * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size
+     * which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid
+     * sending huge requests.
      */
-    public void setSerializerClass(String serializerClass) {
-        this.serializerClass = serializerClass;
+    public void setMaxRequestSize(Integer maxRequestSize) {
+        this.maxRequestSize = maxRequestSize;
     }
 
-    public String getKeySerializerClass() {
-        return keySerializerClass;
+    public Integer getReceiveBufferBytes() {
+        return receiveBufferBytes;
     }
 
     /**
-     * The serializer class for keys (defaults to the same as for messages if nothing is given).
+     * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
      */
-    public void setKeySerializerClass(String keySerializerClass) {
-        this.keySerializerClass = keySerializerClass;
+    public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+        this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public String getOffsetsStorage() {
-        return offsetsStorage;
+    public Integer getTimeoutMs() {
+        return timeoutMs;
     }
 
     /**
-     * Select where offsets should be stored (zookeeper or kafka).
+     * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the
+     * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments
+     * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include
+     * the network latency of the request.
      */
-    public void setOffsetsStorage(String offsetsStorage) {
-        this.offsetsStorage = offsetsStorage;
+    public void setTimeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
     }
 
-    public Boolean isDualCommitEnabled() {
-        return dualCommitEnabled;
+    public Integer getMaxInFlightRequest() {
+        return maxInFlightRequest;
     }
 
     /**
-     * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
-     * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
-     * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
-     * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+     * The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting
+     * is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
      */
-    public void setDualCommitEnabled(Boolean dualCommitEnabled) {
-        this.dualCommitEnabled = dualCommitEnabled;
+    public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+        this.maxInFlightRequest = maxInFlightRequest;
+    }
+
+    public Integer getMetadataFetchTimeoutMs() {
+        return metadataFetchTimeoutMs;
     }
 
     /**
-     * Returns a copy of this configuration
+     * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions.
+     * This fetch to succeed before throwing an exception back to the client.
      */
-    public KafkaConfiguration copy() {
-        try {
-            return (KafkaConfiguration)clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeCamelException(e);
-        }
+    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
+    }
+
+    public Integer getMetadataMaxAgeMs() {
+        return metadataMaxAgeMs;
+    }
+
+    /**
+     * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership
+     * changes to proactively discover any new brokers or partitions.
+     */
+    public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+        this.metadataMaxAgeMs = metadataMaxAgeMs;
+    }
+
+    public String getMetricReporters() {
+        return metricReporters;
+    }
+
+    /**
+     * A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be
+     * notified of new metric creation. The JmxReporter is always included to register JMX statistics.
+     */
+    public void setMetricReporters(String metricReporters) {
+        this.metricReporters = metricReporters;
+    }
+
+    public Integer getNoOfMetricsSample() {
+        return noOfMetricsSample;
+    }
+
+    /**
+     * The number of samples maintained to compute metrics.
+     */
+    public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+        this.noOfMetricsSample = noOfMetricsSample;
+    }
+
+    public Integer getMetricsSampleWindowMs() {
+        return metricsSampleWindowMs;
+    }
+
+    /**
+     * The number of samples maintained to compute metrics.
+     */
+    public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+        this.metricsSampleWindowMs = metricsSampleWindowMs;
+    }
+
+    public Integer getReconnectBackoffMs() {
+        return reconnectBackoffMs;
+    }
+
+    /**
+     * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host
+     * in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+     */
+    public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+        this.reconnectBackoffMs = reconnectBackoffMs;
+    }
+
+    public Integer getHeartbeatIntervalMs() {
+        return heartbeatIntervalMs;
+    }
+
+    /**
+     * The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.
+     * Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new
+     * consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set
+     * no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
+     */
+    public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+        this.heartbeatIntervalMs = heartbeatIntervalMs;
+    }
+
+    public Integer getMaxPartitionFetchBytes() {
+        return maxPartitionFetchBytes;
+    }
+
+    /**
+     * The maximum amount of data per-partition the server will return. The maximum total memory used for
+     * a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the
+     * maximum message size the server allows or else it is possible for the producer to send messages larger
+     * than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message
+     * on a certain partition.
+     */
+    public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+        this.maxPartitionFetchBytes = maxPartitionFetchBytes;
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * The timeout used to detect failures when using Kafka's group management facilities.
+     */
+    public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+        this.sessionTimeoutMs = sessionTimeoutMs;
+    }
+
+    public String getPartitionAssignor() {
+        return partitionAssignor;
+    }
+
+    /**
+     * The class name of the partition assignment strategy that the client will use to distribute
+     * partition ownership amongst consumer instances when group management is used
+     */
+    public void setPartitionAssignor(String partitionAssignor) {
+        this.partitionAssignor = partitionAssignor;
+    }
+
+    public Integer getConsumerRequestTimeoutMs() {
+        return consumerRequestTimeoutMs;
+    }
+
+    /**
+     * The configuration controls the maximum amount of time the client will wait for the response
+     * of a request. If the response is not received before the timeout elapses the client will resend
+     * the request if necessary or fail the request if retries are exhausted.
+     */
+    public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+        this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
+    }
+
+    public Boolean getCheckCrcs() {
+        return checkCrcs;
+    }
+
+    /**
+     * Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
+     * corruption to the messages occurred. This check adds some overhead, so it may be disabled in
+     * cases seeking extreme performance.
+     */
+    public void setCheckCrcs(Boolean checkCrcs) {
+        this.checkCrcs = checkCrcs;
+    }
+
+    public String getKeyDeserializer() {
+        return keyDeserializer;
+    }
+
+    /**
+     * Deserializer class for key that implements the Deserializer interface.
+     */
+    public void setKeyDeserializer(String keyDeserializer) {
+        this.keyDeserializer = keyDeserializer;
+    }
+
+    public String getValueDeserializer() {
+        return valueDeserializer;
+    }
+
+    /**
+     * Deserializer class for value that implements the Deserializer interface.
+     */
+    public void setValueDeserializer(String valueDeserializer) {
+        this.valueDeserializer = valueDeserializer;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index d3ff482..db99a09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -16,21 +16,20 @@
  */
 package org.apache.camel.component.kafka;
 
-/**
- *
- */
 public final class KafkaConstants {
 
-    public static final String DEFAULT_GROUP = "group1";
-
     public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
-    public static final String PARTITION = "kafka.PARTITION";
-    public static final String KEY = "kafka.KEY";
+    public static final String PARTITION = "kafka.EXCHANGE_NAME";
+    public static final String KEY = "kafka.CONTENT_TYPE";
     public static final String TOPIC = "kafka.TOPIC";
     public static final String OFFSET = "kafka.OFFSET";
 
     public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
     public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+    public static final String KAFKA_DEFAULT_SERIALIZER  = "org.apache.kafka.common.serialization.StringSerializer";
+    public static final String KAFKA_DEFAULT_DESERIALIZER  = "org.apache.kafka.common.serialization.StringDeserializer";
+    public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+    public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor";
 
     private KafkaConstants() {
         // Utility class