You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/09 10:10:54 UTC

[1/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.

Repository: camel
Updated Branches:
  refs/heads/master 038e1617f -> 621a704c4


http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 7934f41..0bb4740 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,24 +17,22 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,7 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
-    
+
     private static final String TOPIC_STRINGS = "test";
     private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
     private static final String TOPIC_BYTES = "testBytes";
@@ -52,15 +50,16 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
 
-    private static ConsumerConnector stringsConsumerConn;
-    private static ConsumerConnector bytesConsumerConn;
+    private static KafkaConsumer<String, String> stringsConsumerConn;
+    private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
 
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
-        + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
-        + "&requestRequiredAcks=-1")
+            + "&requestRequiredAcks=-1")
     private Endpoint toStrings;
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1")
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1"
+            + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+            + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
     private Endpoint toBytes;
 
     @Produce(uri = "direct:startStrings")
@@ -69,46 +68,42 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     @Produce(uri = "direct:startBytes")
     private ProducerTemplate bytesTemplate;
 
-
     @BeforeClass
     public static void before() {
         Properties stringsProps = new Properties();
-       
-        stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort());
-        stringsProps.put("group.id", GROUP_STRINGS);
-        stringsProps.put("zookeeper.session.timeout.ms", "6000");
-        stringsProps.put("zookeeper.connectiontimeout.ms", "12000");
-        stringsProps.put("zookeeper.sync.time.ms", "200");
-        stringsProps.put("auto.commit.interval.ms", "1000");
-        stringsProps.put("auto.offset.reset", "smallest");
-        stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps));
+
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        stringsConsumerConn = new KafkaConsumer<String, String>(stringsProps);
 
         Properties bytesProps = new Properties();
         bytesProps.putAll(stringsProps);
         bytesProps.put("group.id", GROUP_BYTES);
-        bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps));
+        bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        bytesConsumerConn = new KafkaConsumer<byte[], byte[]>(bytesProps);
     }
 
     @AfterClass
     public static void after() {
-        stringsConsumerConn.shutdown();
-        bytesConsumerConn.shutdown();
+        stringsConsumerConn.close();
+        bytesConsumerConn.close();
     }
 
     @Override
-    protected RouteBuilder[] createRouteBuilders() throws Exception {
-        return new RouteBuilder[] {
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("direct:startStrings").to(toStrings);
-                }
-            },
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("direct:startBytes").to(toBytes);
-                }
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:startStrings").to(toStrings);
+
+                from("direct:startBytes").to(toBytes);
             }
         };
     }
@@ -120,14 +115,11 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
-        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-        topicCountMap.put(TOPIC_STRINGS, 5);
-        topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5);
-        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap);
-
         sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
 
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
@@ -140,11 +132,6 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
-        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-        topicCountMap.put(TOPIC_BYTES, 5);
-        topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5);
-        createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap);
-
         Map<String, Object> inTopicHeaders = new HashMap<String, Object>();
         inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
         sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders);
@@ -154,22 +141,47 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
         sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders);
 
+        createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch);
+
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
     }
 
-    private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader,
-                                            CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap);
+    private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn,
+                                            String topic, String topicInHeader, CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
 
-        ExecutorService executor = Executors.newFixedThreadPool(10);
-        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
-            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        while (run) {
+            ConsumerRecords<String, String> records = consumerConn.poll(100);
+            for (ConsumerRecord<String, String> record : records) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
         }
-        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) {
-            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+
+    }
+
+    private void createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic,
+                                                 String topicInHeader, CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
+
+        while (run) {
+            ConsumerRecords<byte[], byte[]> records = consumerConn.poll(100);
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
         }
+
     }
 
     private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) {
@@ -186,23 +198,4 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         }
     }
 
-    private static class KakfaTopicConsumer implements Runnable {
-        private final KafkaStream<byte[], byte[]> stream;
-        private final CountDownLatch latch;
-
-        public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) {
-            this.stream = stream;
-            this.latch = latch;
-        }
-
-        @Override
-        public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            while (it.hasNext()) {
-                String msg = new String(it.next().message());
-                LOG.info("Get the message" + msg);
-                latch.countDown();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index f2bcd6b..98f6421 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -18,12 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
@@ -41,18 +41,17 @@ public class KafkaProducerTest {
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
-        endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null);
+        endpoint = new KafkaEndpoint(
+                "kafka:broker1:1234,broker2:4567?topic=sometopic", null);
         endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
-        producer.producer = Mockito.mock(Producer.class);
+        producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
     }
 
     @Test
     public void testPropertyBuilder() throws Exception {
-        endpoint.setPartitioner("com.sksamuel.someclass");
         Properties props = producer.getProps();
-        assertEquals("com.sksamuel.someclass", props.getProperty("partitioner.class"));
-        assertEquals("broker1:1234,broker2:4567", props.getProperty("metadata.broker.list"));
+        assertEquals("broker1:1234,broker2:4567", props.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
     }
 
     @Test
@@ -63,20 +62,18 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
-
-        Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
+        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
     }
 
     @Test
     public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
         endpoint.setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic", "4");
+        verifySendMessage("anotherTopic");
     }
 
     @Test
@@ -112,10 +109,9 @@ public class KafkaProducerTest {
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
-
+        in.setHeader(KafkaConstants.KEY, "someKey");
         producer.process(exchange);
-
-        verifySendMessage("4", "someTopic", "4");
+        verifySendMessage("4", "someTopic", "someKey");
     }
 
     @Test
@@ -126,9 +122,9 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("someKey", "someTopic", "someKey");
+        verifySendMessage("someTopic", "someKey");
     }
-    
+
     @Test
     public void processSendMessageWithBridgeEndpoint() throws Exception {
         endpoint.setTopic("someTopic");
@@ -136,19 +132,44 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
-        
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        producer.process(exchange);
+
+        verifySendMessage("4", "someTopic", "someKey");
+    }
+
+    @Test // Message and Topic Name alone
+    public void processSendsMesssageWithMessageTopicName() throws Exception {
+        endpoint.setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
         producer.process(exchange);
-        
-        verifySendMessage("someKey", "someTopic", "someKey");
+
+        verifySendMessage("someTopic");
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(String partitionKey, String topic, String messageKey) {
-        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals(partitionKey, captor.getValue().partitionKey());
+        ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(new Integer(partitionKey), captor.getValue().partition());
+        assertEquals(messageKey, captor.getValue().key());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String topic, String messageKey) {
+        ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
         assertEquals(messageKey, captor.getValue().key());
         assertEquals(topic, captor.getValue().topic());
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String topic) {
+        ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
deleted file mode 100644
index 039a2e7..0000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class SimplePartitioner implements Partitioner {
-
-    public SimplePartitioner(VerifiableProperties props) {
-    }
-
-    /**
-     * Uses the key to calculate a partition bucket id for routing
-     * the data to the appropriate broker partition
-     *
-     * @return an integer between 0 and numPartitions-1
-     */
-    @Override
-    public int partition(Object key, int numPartitions) {
-        return key.hashCode() % numPartitions;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index ce11a47..42403c2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -27,13 +27,9 @@ import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.ZkUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Option;
 
 public class EmbeddedKafkaCluster {
-    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
-
     private final List<Integer> ports;
     private final String zkConnection;
     private final Properties baseProperties;
@@ -68,7 +64,7 @@ public class EmbeddedKafkaCluster {
         return null;
     }
 
-    public void createTopics(String...topics) {
+    public void createTopics(String... topics) {
         for (String topic : topics) {
             AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties());
         }
@@ -112,10 +108,10 @@ public class EmbeddedKafkaCluster {
             properties.setProperty("host.name", "localhost");
             properties.setProperty("port", Integer.toString(port));
             properties.setProperty("log.dir", logDir.getAbsolutePath());
-            properties.setProperty("num.partitions",  String.valueOf(1));
-            properties.setProperty("auto.create.topics.enable",  String.valueOf(Boolean.TRUE));
+            properties.setProperty("num.partitions", String.valueOf(1));
+            properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+            System.out.println("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath());
             properties.setProperty("log.flush.interval.messages", String.valueOf(1));
-            LOG.info("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath());
 
             KafkaServer broker = startBroker(properties);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j.properties b/components/camel-kafka/src/test/resources/log4j.properties
index 67458fd..44266d1 100644
--- a/components/camel-kafka/src/test/resources/log4j.properties
+++ b/components/camel-kafka/src/test/resources/log4j.properties
@@ -32,4 +32,4 @@ log4j.appender.out=org.apache.log4j.FileAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 log4j.appender.out.file=target/camel-kafka-test.log
-log4j.appender.out.append=true
+log4j.appender.out.append=true
\ No newline at end of file


[3/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.

Posted by da...@apache.org.
CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2aa831d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2aa831d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2aa831d

Branch: refs/heads/master
Commit: b2aa831da8c8f78f7d6ca908c5b33957bbc7fa24
Parents: 038e161
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 09:58:36 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:01:59 2016 +0100

----------------------------------------------------------------------
 components/camel-kafka/pom.xml                  |   26 +-
 .../camel/component/kafka/KafkaComponent.java   |   22 +-
 .../component/kafka/KafkaConfiguration.java     | 1204 ++++++++++++------
 .../camel/component/kafka/KafkaConstants.java   |   13 +-
 .../camel/component/kafka/KafkaConsumer.java    |  183 +--
 .../camel/component/kafka/KafkaEndpoint.java    |  598 +++++----
 .../camel/component/kafka/KafkaProducer.java    |   64 +-
 .../component/kafka/BaseEmbeddedKafkaTest.java  |   21 +-
 .../component/kafka/KafkaComponentTest.java     |  224 ++--
 .../kafka/KafkaConsumerBatchSizeTest.java       |   64 +-
 .../component/kafka/KafkaConsumerFullTest.java  |   34 +-
 .../component/kafka/KafkaConsumerTest.java      |    8 +-
 .../component/kafka/KafkaEndpointTest.java      |   13 +-
 .../component/kafka/KafkaProducerFullTest.java  |  145 +--
 .../component/kafka/KafkaProducerTest.java      |   65 +-
 .../component/kafka/SimplePartitioner.java      |   39 -
 .../kafka/embedded/EmbeddedKafkaCluster.java    |   12 +-
 .../src/test/resources/log4j.properties         |    2 +-
 18 files changed, 1618 insertions(+), 1119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 1a5f98f..f8bbdb5 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -15,7 +15,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
@@ -37,31 +38,35 @@
   </properties>
 
   <dependencies>
+
+    <!-- camel -->
     <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-core</artifactId>
     </dependency>
+
+    <!-- kafka java client -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka-version}</version>
+    </dependency>
+
+    <!-- kafka server for testing using scala -->
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>${kafka-version}</version>
+      <scope>test</scope>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-simple</artifactId>
         </exclusion>
-          <exclusion>
-              <artifactId>scala-library</artifactId>
-              <groupId>org.scala-lang</groupId>
-          </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
 
+    <!-- test -->
     <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-test</artifactId>
@@ -77,6 +82,7 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c9d4c2a..2981b3f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -20,8 +20,6 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.UriEndpointComponent;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.EndpointHelper;
 
 public class KafkaComponent extends UriEndpointComponent {
 
@@ -34,30 +32,14 @@ public class KafkaComponent extends UriEndpointComponent {
     }
 
     @Override
-    protected KafkaEndpoint createEndpoint(String uri,
-                                           String remaining,
-                                           Map<String, Object> params) throws Exception {
-
+    protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
         String brokers = remaining.split("\\?")[0];
-        Object confparam = params.get("configuration");
-        if (confparam != null) {
-            // need a special handling to resolve the reference before other parameters are set/merged into the config
-            KafkaConfiguration confobj = null;
-            if (confparam instanceof KafkaConfiguration) {
-                confobj = (KafkaConfiguration)confparam;
-            } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) { 
-                confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1));
-            }
-            if (confobj != null) {
-                endpoint.setConfiguration(confobj.copy());
-            }
-            params.remove("configuration");
-        }
         if (brokers != null) {
             endpoint.getConfiguration().setBrokers(brokers);
         }
         setProperties(endpoint, params);
         return endpoint;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 894df0c..4a948c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -18,28 +18,28 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.producer.DefaultPartitioner;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 
 @UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration {
+
+    @UriPath @Metadata(required = "true")
+    private String brokers;
 
-    @UriParam
-    private String zookeeperConnect;
-    @UriParam
-    private String zookeeperHost;
-    @UriParam(defaultValue = "2181")
-    private int zookeeperPort = 2181;
     @UriParam @Metadata(required = "true")
     private String topic;
     @UriParam
     private String groupId;
-    @UriParam(defaultValue = "DefaultPartitioner")
-    private String partitioner = DefaultPartitioner.class.getCanonicalName();
+    @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
+    private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
     @UriParam(label = "consumer", defaultValue = "10")
     private int consumerStreams = 10;
     @UriParam(label = "consumer", defaultValue = "1")
@@ -53,134 +53,302 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam
     private String clientId;
 
+    //key.deserializer
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+    private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+    //value.deserializer
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+    private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+    //fetch.min.bytes
+    @UriParam(label = "consumer", defaultValue = "1024")
+    private Integer fetchMinBytes = 1024;
+    //heartbeat.interval.ms
+    @UriParam(label = "consumer", defaultValue = "3000")
+    private Integer heartbeatIntervalMs = 3000;
+    //max.partition.fetch.bytes
+    @UriParam(label = "consumer", defaultValue = "1048576")
+    private Integer maxPartitionFetchBytes = 1048576;
+    //session.timeout.ms
+    @UriParam(label = "consumer", defaultValue = "30000")
+    private Integer sessionTimeoutMs = 30000;
+    //auto.offset.reset
+    @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
+    private String autoOffsetReset = "latest";
+    //partition.assignment.strategy
+    @UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR)
+    private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
+    //request.timeout.ms
+    @UriParam(label = "consumer", defaultValue = "40000")
+    private Integer consumerRequestTimeoutMs = 40000;
+    //auto.commit.interval.ms
+    @UriParam(label = "consumer", defaultValue = "5000")
+    private Integer autoCommitIntervalMs = 5000;
+    //check.crcs
+    @UriParam(label = "consumer", defaultValue = "true")
+    private Boolean checkCrcs = true;
+    //fetch.max.wait.ms
+    @UriParam(label = "consumer", defaultValue = "500")
+    private Integer fetchWaitMaxMs = 500;
+
     //Consumer configuration properties
     @UriParam(label = "consumer")
     private String consumerId;
-    @UriParam(label = "consumer", defaultValue = "30000")
-    private Integer socketTimeoutMs = 30 * 1000;
-    @UriParam(label = "consumer", defaultValue = "" + 64 * 1024)
-    private Integer socketReceiveBufferBytes = 64 * 1024;
-    @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024)
-    private Integer fetchMessageMaxBytes = 1024 * 1024;
     @UriParam(label = "consumer", defaultValue = "true")
     private Boolean autoCommitEnable = true;
-    @UriParam(label = "consumer", defaultValue = "60000")
-    private Integer autoCommitIntervalMs = 60 * 1000;
-    @UriParam(label = "consumer", defaultValue = "2")
-    private Integer queuedMaxMessageChunks = 2;
-    @UriParam(label = "consumer", defaultValue = "4")
-    private Integer rebalanceMaxRetries = 4;
-    @UriParam(label = "consumer", defaultValue = "1")
-    private Integer fetchMinBytes = 1;
-    @UriParam(label = "consumer", defaultValue = "100")
-    private Integer fetchWaitMaxMs = 100;
-    @UriParam(label = "consumer", defaultValue = "2000")
-    private Integer rebalanceBackoffMs = 2000;
-    @UriParam(label = "consumer", defaultValue = "200")
-    private Integer refreshLeaderBackoffMs = 200;
-    @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail")
-    private String autoOffsetReset = "largest";
-    @UriParam(label = "consumer")
-    private Integer consumerTimeoutMs;
-    @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
-    private String offsetsStorage = "zookeeper";
-    @UriParam(label = "consumer", defaultValue = "true")
-    private Boolean dualCommitEnabled = true;
-
-    //Zookeepr configuration properties
-    @UriParam
-    private Integer zookeeperSessionTimeoutMs;
-    @UriParam
-    private Integer zookeeperConnectionTimeoutMs;
-    @UriParam
-    private Integer zookeeperSyncTimeMs;
 
     //Producer configuration properties
-    @UriPath
-    private String brokers;
-    @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
-    private String producerType = "sync";
-    @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy")
-    private String compressionCodec = "none";
-    @UriParam(label = "producer")
-    private String compressedTopics;
-    @UriParam(label = "producer", defaultValue = "3")
-    private Integer messageSendMaxRetries = 3;
     @UriParam(label = "producer", defaultValue = "100")
     private Integer retryBackoffMs = 100;
-    @UriParam(label = "producer", defaultValue = "600000")
-    private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
-
-    //Sync producer config
-    @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
-    private Integer sendBufferBytes = 100 * 1024;
-    @UriParam(label = "producer", defaultValue = "0")
-    private short requestRequiredAcks;
-    @UriParam(label = "producer", defaultValue = "10000")
-    private Integer requestTimeoutMs = 10000;
 
     //Async producer config
-    @UriParam(label = "producer", defaultValue = "5000")
-    private Integer queueBufferingMaxMs = 5000;
     @UriParam(label = "producer", defaultValue = "10000")
     private Integer queueBufferingMaxMessages = 10000;
     @UriParam(label = "producer")
-    private Integer queueEnqueueTimeoutMs;
-    @UriParam(label = "producer", defaultValue = "200")
-    private Integer batchNumMessages = 200;
-    @UriParam(label = "producer")
     private String serializerClass;
     @UriParam(label = "producer")
     private String keySerializerClass;
 
+    @UriParam(label = "producer", defaultValue = "1")
+    private Integer requestRequiredAcks = 1;
+    //buffer.memory
+    @UriParam(label = "producer", defaultValue = "33554432")
+    private Integer bufferMemorySize = 33554432;
+    //compression.type
+    @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4")
+    private String compressionCodec = "none";
+    //retries
+    @UriParam(label = "producer", defaultValue = "0")
+    private Integer retries = 0;
+    // SSL
+    // ssl.key.password
+    @UriParam(label = "producer")
+    private String sslKeyPassword;
+    // ssl.keystore.location
+    @UriParam(label = "producer")
+    private String sslKeystoreLocation;
+    // ssl.keystore.password
+    @UriParam(label = "producer")
+    private String sslKeystorePassword;
+    //ssl.truststore.location
+    @UriParam(label = "producer")
+    private String sslTruststoreLocation;
+    //ssl.truststore.password
+    @UriParam(label = "producer")
+    private String sslTruststorePassword;
+    //batch.size
+    @UriParam(label = "producer", defaultValue = "16384")
+    private Integer producerBatchSize = 16384;
+    //connections.max.idle.ms
+    @UriParam(label = "producer", defaultValue = "540000")
+    private Integer connectionMaxIdleMs = 540000;
+    //linger.ms
+    @UriParam(label = "producer", defaultValue = "0")
+    private Integer lingerMs = 0;
+    //linger.ms
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer maxBlockMs = 60000;
+    //max.request.size
+    @UriParam(label = "producer", defaultValue = "1048576")
+    private Integer maxRequestSize = 1048576;
+    //receive.buffer.bytes
+    @UriParam(label = "producer", defaultValue = "32768")
+    private Integer receiveBufferBytes = 32768;
+    //request.timeout.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer requestTimeoutMs = 30000;
+    // SASL & sucurity Protocol
+    //sasl.kerberos.service.name
+    @UriParam(label = "producer")
+    private String saslKerberosServiceName;
+    //security.protocol
+    @UriParam(label = "producer", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL)
+    private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+    //send.buffer.bytes
+    @UriParam(label = "producer", defaultValue = "131072")
+    private Integer sendBufferBytes = 131072;
+    //SSL
+    //ssl.enabled.protocols
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)
+    private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+    //ssl.keystore.type
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE)
+    private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+    //ssl.protocol
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_PROTOCOL)
+    private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
+    //ssl.provider
+    @UriParam(label = "producer")
+    private String sslProvider;
+    //ssl.truststore.type
+    @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
+    private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+    //timeout.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer timeoutMs = 30000;
+    //block.on.buffer.full
+    @UriParam(label = "producer", defaultValue = "false")
+    private Boolean blockOnBufferFull = false;
+    //max.in.flight.requests.per.connection
+    @UriParam(label = "producer", defaultValue = "5")
+    private Integer maxInFlightRequest = 5;
+    //metadata.fetch.timeout.ms
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer metadataFetchTimeoutMs = 600 * 1000;
+    //metadata.max.age.ms
+    @UriParam(label = "producer", defaultValue = "300000")
+    private Integer metadataMaxAgeMs = 300000;
+    //metric.reporters
+    @UriParam(label = "producer")
+    private String metricReporters;
+    //metrics.num.samples
+    @UriParam(label = "producer", defaultValue = "2")
+    private Integer noOfMetricsSample = 2;
+    //metrics.sample.window.ms
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer metricsSampleWindowMs = 30000;
+    //reconnect.backoff.ms
+    @UriParam(label = "producer", defaultValue = "50")
+    private Integer reconnectBackoffMs = 50;
+    //SASL
+    //sasl.kerberos.kinit.cmd
+    @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
+    private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
+    //sasl.kerberos.min.time.before.relogin
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer kerberosBeforeReloginMinTime = 60000;
+    //sasl.kerberos.ticket.renew.jitter
+    @UriParam(label = "producer", defaultValue = "0.05")
+    private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
+    //sasl.kerberos.ticket.renew.window.factor
+    @UriParam(label = "producer", defaultValue = "0.8")
+    private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
+    //SSL
+    //ssl.cipher.suites
+    @UriParam(label = "producer")
+    private String sslCipherSuites;
+    //ssl.endpoint.identification.algorithm
+    @UriParam(label = "producer")
+    private String sslEndpointAlgorithm;
+    //ssl.keymanager.algorithm
+    @UriParam(label = "producer", defaultValue = "SunX509")
+    private String sslKeymanagerAlgorithm = "SunX509";
+    //ssl.trustmanager.algorithm
+    @UriParam(label = "producer", defaultValue = "PKIX")
+    private String sslTrustmanagerAlgorithm = "PKIX";
+
     public KafkaConfiguration() {
     }
 
     public Properties createProducerProperties() {
         Properties props = new Properties();
-        addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
-        addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
-        addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
-        addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
-        addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
-        addPropertyIfNotNull(props, "producer.type", getProducerType());
-        addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
-        addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
-        addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
-        addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
-        addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
-        addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
-        addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
-        addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
-        addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
-        addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
-        addPropertyIfNotNull(props, "client.id", getClientId());
+        addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass());
+        addPropertyIfNotNull(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializerClass());
+        addPropertyIfNotNull(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks());
+        addPropertyIfNotNull(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize());
+        addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
+        addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
+        // SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries());
+        addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
+        addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
+        addPropertyIfNotNull(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+        addPropertyIfNotNull(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize());
+        addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
+        addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+        addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+        // Security protocol
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
+        addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
+        addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
+        addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
+        addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+        addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+        addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+        addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+        addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+        addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+
         return props;
     }
 
     public Properties createConsumerProperties() {
         Properties props = new Properties();
-        addPropertyIfNotNull(props, "consumer.id", getConsumerId());
-        addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
-        addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
-        addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
-        addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
-        addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
-        addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
-        addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
-        addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
-        addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks());
-        addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
-        addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
-        addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
-        addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
-        addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
-        addPropertyIfNotNull(props, "client.id", getClientId());
-        addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
-        addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
-        addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
-        addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
-        addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
+        addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
+        addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
+        addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+        addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        // SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+        addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
+        addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+        addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
+        addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+        addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+        // Security protocol
+        addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+        addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+        addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+        addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
+        addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
+        addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
+        addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs());
+        addPropertyIfNotNull(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+        addPropertyIfNotNull(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+        addPropertyIfNotNull(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+        addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+        addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+        addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+        //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+        //SSL
+        addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+        addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+        addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
         return props;
     }
 
@@ -191,65 +359,6 @@ public class KafkaConfiguration implements Cloneable {
         }
     }
 
-    public String getZookeeperConnect() {
-        if (this.zookeeperConnect != null) {
-            return zookeeperConnect;
-        } else {
-            return getZookeeperHost() + ":" + getZookeeperPort();
-        }
-    }
-
-    /**
-     * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.
-     * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the
-     * form hostname1:port1,hostname2:port2,hostname3:port3.
-     * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data
-     * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string.
-     * For example to give a chroot path of /chroot/path you would give the connection
-     * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
-     */
-    public void setZookeeperConnect(String zookeeperConnect) {
-        this.zookeeperConnect = zookeeperConnect;
-
-        // connect overrides host and port
-        this.zookeeperHost = null;
-        this.zookeeperPort = -1;
-    }
-
-    public String getZookeeperHost() {
-        return zookeeperHost;
-    }
-
-    /**
-     * The zookeeper host to use.
-     * <p/>
-     * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
-     * <p/>
-     * This option can only be used if zookeeperConnect is not in use.
-     */
-    public void setZookeeperHost(String zookeeperHost) {
-        if (this.zookeeperConnect == null) {
-            this.zookeeperHost = zookeeperHost;
-        }
-    }
-
-    public int getZookeeperPort() {
-        return zookeeperPort;
-    }
-
-    /**
-     * The zookeeper port to use
-     * <p/>
-     * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
-     * <p/>
-     * This option can only be used if zookeeperConnect is not in use.
-     */
-    public void setZookeeperPort(int zookeeperPort) {
-        if (this.zookeeperConnect == null) {
-            this.zookeeperPort = zookeeperPort;
-        }
-    }
-
     public String getGroupId() {
         return groupId;
     }
@@ -278,7 +387,7 @@ public class KafkaConfiguration implements Cloneable {
     }
 
     /**
-     * Name of the topic to use
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -351,440 +460,741 @@ public class KafkaConfiguration implements Cloneable {
         this.consumerId = consumerId;
     }
 
-    public Integer getSocketTimeoutMs() {
-        return socketTimeoutMs;
+    public Boolean isAutoCommitEnable() {
+        return autoCommitEnable;
     }
 
     /**
-     * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms.
+     * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
+     * This committed offset will be used when the process fails as the position from which the new consumer will begin.
      */
-    public void setSocketTimeoutMs(Integer socketTimeoutMs) {
-        this.socketTimeoutMs = socketTimeoutMs;
+    public void setAutoCommitEnable(Boolean autoCommitEnable) {
+        this.autoCommitEnable = autoCommitEnable;
     }
 
-    public Integer getSocketReceiveBufferBytes() {
-        return socketReceiveBufferBytes;
+    public Integer getAutoCommitIntervalMs() {
+        return autoCommitIntervalMs;
     }
 
     /**
-     * The socket receive buffer for network requests
+     * The frequency in ms that the consumer offsets are committed to zookeeper.
      */
-    public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
-        this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+        this.autoCommitIntervalMs = autoCommitIntervalMs;
     }
 
-    public Integer getFetchMessageMaxBytes() {
-        return fetchMessageMaxBytes;
+    public Integer getFetchMinBytes() {
+        return fetchMinBytes;
     }
 
     /**
-     * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request.
-     * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer.
-     * The fetch request size must be at least as large as the maximum message size the server allows or else it
-     * is possible for the producer to send messages larger than the consumer can fetch.
+     * The minimum amount of data the server should return for a fetch request.
+     * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
      */
-    public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
-        this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+    public void setFetchMinBytes(Integer fetchMinBytes) {
+        this.fetchMinBytes = fetchMinBytes;
     }
 
-    public Boolean isAutoCommitEnable() {
-        return autoCommitEnable;
+    public Integer getFetchWaitMaxMs() {
+        return fetchWaitMaxMs;
     }
 
     /**
-     * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
-     * This committed offset will be used when the process fails as the position from which the new consumer will begin.
+     * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
      */
-    public void setAutoCommitEnable(Boolean autoCommitEnable) {
-        this.autoCommitEnable = autoCommitEnable;
+    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+        this.fetchWaitMaxMs = fetchWaitMaxMs;
     }
 
-    public Integer getAutoCommitIntervalMs() {
-        return autoCommitIntervalMs;
+    public String getAutoOffsetReset() {
+        return autoOffsetReset;
     }
 
     /**
-     * The frequency in ms that the consumer offsets are committed to zookeeper.
+     * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
+     * smallest : automatically reset the offset to the smallest offset
+     * largest : automatically reset the offset to the largest offset
+     * fail: throw exception to the consumer
      */
-    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
-        this.autoCommitIntervalMs = autoCommitIntervalMs;
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        this.autoOffsetReset = autoOffsetReset;
     }
 
-    public Integer getQueuedMaxMessageChunks() {
-        return queuedMaxMessageChunks;
+    public String getBrokers() {
+        return brokers;
     }
 
     /**
-     * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
+     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
+     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
      */
-    public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
-        this.queuedMaxMessageChunks = queuedMaxMessageChunks;
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
     }
 
-    public Integer getRebalanceMaxRetries() {
-        return rebalanceMaxRetries;
+    public String getCompressionCodec() {
+        return compressionCodec;
     }
 
     /**
-     * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
-     * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry.
-     * This setting controls the maximum number of attempts before giving up.
+     * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
      */
-    public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
-        this.rebalanceMaxRetries = rebalanceMaxRetries;
+    public void setCompressionCodec(String compressionCodec) {
+        this.compressionCodec = compressionCodec;
     }
 
-    public Integer getFetchMinBytes() {
-        return fetchMinBytes;
+    public Integer getRetryBackoffMs() {
+        return retryBackoffMs;
     }
 
     /**
-     * The minimum amount of data the server should return for a fetch request.
-     * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+     * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
+     * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
      */
-    public void setFetchMinBytes(Integer fetchMinBytes) {
-        this.fetchMinBytes = fetchMinBytes;
+    public void setRetryBackoffMs(Integer retryBackoffMs) {
+        this.retryBackoffMs = retryBackoffMs;
     }
 
-    public Integer getFetchWaitMaxMs() {
-        return fetchWaitMaxMs;
+    public Integer getSendBufferBytes() {
+        return sendBufferBytes;
     }
 
     /**
-     * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
+     * Socket write buffer size
      */
-    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
-        this.fetchWaitMaxMs = fetchWaitMaxMs;
+    public void setSendBufferBytes(Integer sendBufferBytes) {
+        this.sendBufferBytes = sendBufferBytes;
     }
 
-    public Integer getRebalanceBackoffMs() {
-        return rebalanceBackoffMs;
+    public Integer getRequestTimeoutMs() {
+        return requestTimeoutMs;
     }
 
     /**
-     * Backoff time between retries during rebalance.
+     * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
      */
-    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
-        this.rebalanceBackoffMs = rebalanceBackoffMs;
+    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+        this.requestTimeoutMs = requestTimeoutMs;
     }
 
-    public Integer getRefreshLeaderBackoffMs() {
-        return refreshLeaderBackoffMs;
+    public Integer getQueueBufferingMaxMessages() {
+        return queueBufferingMaxMessages;
     }
 
     /**
-     * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
+     * The maximum number of unsent messages that can be queued up the producer when using async
+     * mode before either the producer must be blocked or data must be dropped.
      */
-    public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
-        this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+        this.queueBufferingMaxMessages = queueBufferingMaxMessages;
     }
 
-    public String getAutoOffsetReset() {
-        return autoOffsetReset;
+    public String getSerializerClass() {
+        if (serializerClass == null) {
+            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        }
+        return serializerClass;
     }
 
     /**
-     * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
-     * smallest : automatically reset the offset to the smallest offset
-     * largest : automatically reset the offset to the largest offset
-     * fail: throw exception to the consumer
+     * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
+     * The default class is kafka.serializer.DefaultEncoder
      */
-    public void setAutoOffsetReset(String autoOffsetReset) {
-        this.autoOffsetReset = autoOffsetReset;
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
     }
 
-    public Integer getConsumerTimeoutMs() {
-        return consumerTimeoutMs;
+    public String getKeySerializerClass() {
+        if (keySerializerClass == null) {
+            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+        }
+        return keySerializerClass;
     }
 
     /**
-     * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
+     * The serializer class for keys (defaults to the same as for messages if nothing is given).
      */
-    public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
-        this.consumerTimeoutMs = consumerTimeoutMs;
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
     }
 
-    public Integer getZookeeperSessionTimeoutMs() {
-        return zookeeperSessionTimeoutMs;
+    public String getKerberosInitCmd() {
+        return kerberosInitCmd;
     }
 
     /**
-     * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
+     * Kerberos kinit command path. Default is /usr/bin/kinit
      */
-    public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
-        this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+    public void setKerberosInitCmd(String kerberosInitCmd) {
+        this.kerberosInitCmd = kerberosInitCmd;
     }
 
-    public Integer getZookeeperConnectionTimeoutMs() {
-        return zookeeperConnectionTimeoutMs;
+    public Integer getKerberosBeforeReloginMinTime() {
+        return kerberosBeforeReloginMinTime;
     }
 
     /**
-     * The max time that the client waits while establishing a connection to zookeeper.
+     * Login thread sleep time between refresh attempts.
      */
-    public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
-        this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+    public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+        this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
     }
 
-    public Integer getZookeeperSyncTimeMs() {
-        return zookeeperSyncTimeMs;
+    public Double getKerberosRenewJitter() {
+        return kerberosRenewJitter;
     }
 
     /**
-     * How far a ZK follower can be behind a ZK leader
+     * Percentage of random jitter added to the renewal time.
      */
-    public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
-        this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+    public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+        this.kerberosRenewJitter = kerberosRenewJitter;
     }
 
-    public String getBrokers() {
-        return brokers;
+    public Double getKerberosRenewWindowFactor() {
+        return kerberosRenewWindowFactor;
     }
 
     /**
-     * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
-     * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
-     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
-     * <p/>
-     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     * Login thread will sleep until the specified window factor of time from last
+     * refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.
      */
-    public void setBrokers(String brokers) {
-        this.brokers = brokers;
+    public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+        this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
     }
 
-    public String getProducerType() {
-        return producerType;
+    public String getSslCipherSuites() {
+        return sslCipherSuites;
     }
 
     /**
-     * This parameter specifies whether the messages are sent asynchronously in a background thread.
-     * Valid values are (1) async for asynchronous send and (2) sync for synchronous send.
-     * By setting the producer to async we allow batching together of requests (which is great for throughput)
-     * but open the possibility of a failure of the client machine dropping unsent data.
+     * A list of cipher suites. This is a named combination of authentication, encryption,
+     * MAC and key exchange algorithm used to negotiate the security settings for a network connection
+     * using TLS or SSL network protocol.By default all the available cipher suites are supported.
      */
-    public void setProducerType(String producerType) {
-        this.producerType = producerType;
+    public void setSslCipherSuites(String sslCipherSuites) {
+        this.sslCipherSuites = sslCipherSuites;
     }
 
-    public String getCompressionCodec() {
-        return compressionCodec;
+    public String getSslEndpointAlgorithm() {
+        return sslEndpointAlgorithm;
     }
 
     /**
-     * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
+     * The endpoint identification algorithm to validate server hostname using server certificate.
      */
-    public void setCompressionCodec(String compressionCodec) {
-        this.compressionCodec = compressionCodec;
+    public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+        this.sslEndpointAlgorithm = sslEndpointAlgorithm;
     }
 
-    public String getCompressedTopics() {
-        return compressedTopics;
+    public String getSslKeymanagerAlgorithm() {
+        return sslKeymanagerAlgorithm;
     }
 
     /**
-     * This parameter allows you to set whether compression should be turned on for particular topics.
-     * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any.
-     * If the list of compressed topics is empty, then enable the specified compression codec for all topics.
-     * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+     * The algorithm used by key manager factory for SSL connections. Default value is the key
+     * manager factory algorithm configured for the Java Virtual Machine.
      */
-    public void setCompressedTopics(String compressedTopics) {
-        this.compressedTopics = compressedTopics;
+    public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+        this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
     }
 
-    public Integer getMessageSendMaxRetries() {
-        return messageSendMaxRetries;
+    public String getSslTrustmanagerAlgorithm() {
+        return sslTrustmanagerAlgorithm;
     }
 
     /**
-     * This property will cause the producer to automatically retry a failed send request.
-     * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here
-     * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
+     * The algorithm used by trust manager factory for SSL connections. Default value is the
+     * trust manager factory algorithm configured for the Java Virtual Machine.
      */
-    public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
-        this.messageSendMaxRetries = messageSendMaxRetries;
+    public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+        this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
     }
 
-    public Integer getRetryBackoffMs() {
-        return retryBackoffMs;
+    public String getSslEnabledProtocols() {
+        return sslEnabledProtocols;
     }
 
     /**
-     * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
-     * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
+     * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.
      */
-    public void setRetryBackoffMs(Integer retryBackoffMs) {
-        this.retryBackoffMs = retryBackoffMs;
+    public void setSslEnabledProtocols(String sslEnabledProtocols) {
+        this.sslEnabledProtocols = sslEnabledProtocols;
     }
 
-    public Integer getTopicMetadataRefreshIntervalMs() {
-        return topicMetadataRefreshIntervalMs;
+    public String getSslKeystoreType() {
+        return sslKeystoreType;
     }
 
     /**
-     * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing,
-     * leader not available...). It will also poll regularly (default: every 10min so 600000ms).
-     * If you set this to a negative value, metadata will only get refreshed on failure.
-     * If you set this to zero, the metadata will get refreshed after each message sent (not recommended).
-     * Important note: the refresh happen only AFTER the message is sent, so if the producer never
-     * sends a message the metadata is never refreshed
+     * The file format of the key store file. This is optional for client. Default value is JKS
      */
-    public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
-        this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+    public void setSslKeystoreType(String sslKeystoreType) {
+        this.sslKeystoreType = sslKeystoreType;
     }
 
-    public Integer getSendBufferBytes() {
-        return sendBufferBytes;
+    public String getSslProtocol() {
+        return sslProtocol;
     }
 
     /**
-     * Socket write buffer size
+     * The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases.
+     * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs,
+     * but their usage is discouraged due to known security vulnerabilities.
      */
-    public void setSendBufferBytes(Integer sendBufferBytes) {
-        this.sendBufferBytes = sendBufferBytes;
+    public void setSslProtocol(String sslProtocol) {
+        this.sslProtocol = sslProtocol;
+    }
+
+    public String getSslProvider() {
+        return sslProvider;
+    }
+
+    /**
+     * The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
+     */
+    public void setSslProvider(String sslProvider) {
+        this.sslProvider = sslProvider;
+    }
+
+    public String getSslTruststoreType() {
+        return sslTruststoreType;
+    }
+
+    /**
+     * The file format of the trust store file. Default value is JKS.
+     */
+    public void setSslTruststoreType(String sslTruststoreType) {
+        this.sslTruststoreType = sslTruststoreType;
+    }
+
+    public String getSaslKerberosServiceName() {
+        return saslKerberosServiceName;
+    }
+
+    /**
+     * The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS
+     * config or in Kafka's config.
+     */
+    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+        this.saslKerberosServiceName = saslKerberosServiceName;
+    }
+
+    public String getSecurityProtocol() {
+        return securityProtocol;
+    }
+
+    /**
+     * Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
+     */
+    public void setSecurityProtocol(String securityProtocol) {
+        this.securityProtocol = securityProtocol;
+    }
+
+    public String getSslKeyPassword() {
+        return sslKeyPassword;
+    }
+
+    /**
+     * The password of the private key in the key store file. This is optional for client.
+     */
+    public void setSslKeyPassword(String sslKeyPassword) {
+        this.sslKeyPassword = sslKeyPassword;
+    }
+
+    public String getSslKeystoreLocation() {
+        return sslKeystoreLocation;
     }
 
-    public short getRequestRequiredAcks() {
+    /**
+     * The location of the key store file. This is optional for client and can be used for two-way
+     * authentication for client.
+     */
+    public void setSslKeystoreLocation(String sslKeystoreLocation) {
+        this.sslKeystoreLocation = sslKeystoreLocation;
+    }
+
+    public String getSslKeystorePassword() {
+        return sslKeystorePassword;
+    }
+
+    /**
+     * The store password for the key store file.This is optional for client and only needed
+     * if ssl.keystore.location is configured.
+     */
+    public void setSslKeystorePassword(String sslKeystorePassword) {
+        this.sslKeystorePassword = sslKeystorePassword;
+    }
+
+    public String getSslTruststoreLocation() {
+        return sslTruststoreLocation;
+    }
+
+    /**
+     * The location of the trust store file.
+     */
+    public void setSslTruststoreLocation(String sslTruststoreLocation) {
+        this.sslTruststoreLocation = sslTruststoreLocation;
+    }
+
+    public String getSslTruststorePassword() {
+        return sslTruststorePassword;
+    }
+
+
+    /**
+     * The password for the trust store file.
+     */
+    public void setSslTruststorePassword(String sslTruststorePassword) {
+        this.sslTruststorePassword = sslTruststorePassword;
+    }
+
+    public Integer getBufferMemorySize() {
+        return bufferMemorySize;
+    }
+
+    /**
+     * The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
+     * If records are sent faster than they can be delivered to the server the producer will either block
+     * or throw an exception based on the preference specified by block.on.buffer.full.This setting should
+     * correspond roughly to the total memory the producer will use, but is not a hard bound since not all
+     * memory the producer uses is used for buffering. Some additional memory will be used for compression
+     * (if compression is enabled) as well as for maintaining in-flight requests.
+     */
+    public void setBufferMemorySize(Integer bufferMemorySize) {
+        this.bufferMemorySize = bufferMemorySize;
+    }
+
+    public Boolean getBlockOnBufferFull() {
+        return blockOnBufferFull;
+    }
+
+    /**
+     * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors.
+     * By default this setting is true and we block, however in some scenarios blocking is not desirable and it
+     * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw
+     * a BufferExhaustedException if a recrord is sent and the buffer space is full.
+     */
+    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+        this.blockOnBufferFull = blockOnBufferFull;
+    }
+
+    public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
 
     /**
-     * This value controls when a produce request is considered completed. Specifically,
-     * how many other brokers must have committed the data to their log and acknowledged this to the leader?
-     * Typical values are (0, 1 or -1):
-     * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7).
-     * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
-     * 1, which means that the producer gets an acknowledgement after the leader replica has received the data.
-     * This option provides better durability as the client waits until the server acknowledges the request as successful
-     * (only messages that were written to the now-dead leader but not yet replicated will be lost).
-     * -1, The producer gets an acknowledgement after all in-sync replicas have received the data.
-     * This option provides the greatest level of durability.
-     * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may,
-     * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas
-     * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting.
-     * Please read the Replication section of the design documentation for a more in-depth discussion.
+     * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+     * This controls the durability of records that are sent. The following settings are common:
+     * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all.
+     * The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server
+     * has received the record in this case, and the retries configuration will not take effect (as the client won't generally
+     * know of any failures). The offset given back for each record will always be set to -1.
+     * acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement
+     * from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have
+     * replicated it then the record will be lost.
+     * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
+     * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
      */
-    public void setRequestRequiredAcks(short requestRequiredAcks) {
+    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
 
-    public Integer getRequestTimeoutMs() {
-        return requestTimeoutMs;
+    public Integer getRetries() {
+        return retries;
     }
 
     /**
-     * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
+     * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
+     * Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially
+     * change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second
+     * succeeds, then the second record may appear first.
      */
-    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
-        this.requestTimeoutMs = requestTimeoutMs;
+    public void setRetries(Integer retries) {
+        this.retries = retries;
     }
 
-    public Integer getQueueBufferingMaxMs() {
-        return queueBufferingMaxMs;
+    public Integer getProducerBatchSize() {
+        return producerBatchSize;
     }
 
     /**
-     * Maximum time to buffer data when using async mode.
-     * For example a setting of 100 will try to batch together 100ms of messages to send at once.
-     * This will improve throughput but adds message delivery latency due to the buffering.
+     * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
+     * This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
+     * No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each
+     * partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero
+     * will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the
+     * specified batch size in anticipation of additional records.
      */
-    public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
-        this.queueBufferingMaxMs = queueBufferingMaxMs;
+    public void setProducerBatchSize(Integer producerBatchSize) {
+        this.producerBatchSize = producerBatchSize;
     }
 
-    public Integer getQueueBufferingMaxMessages() {
-        return queueBufferingMaxMessages;
+    public Integer getConnectionMaxIdleMs() {
+        return connectionMaxIdleMs;
     }
 
     /**
-     * The maximum number of unsent messages that can be queued up the producer when using async
-     * mode before either the producer must be blocked or data must be dropped.
+     * Close idle connections after the number of milliseconds specified by this config.
      */
-    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
-        this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+    public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+        this.connectionMaxIdleMs = connectionMaxIdleMs;
     }
 
-    public Integer getQueueEnqueueTimeoutMs() {
-        return queueEnqueueTimeoutMs;
+    public Integer getLingerMs() {
+        return lingerMs;
     }
 
     /**
-     * The amount of time to block before dropping messages when running in async mode and the buffer has reached
-     * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full
-     * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.
+     * The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this
+     * occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce
+     * the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is,
+     * rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that
+     * the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on
+     * the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting,
+     * however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more
+     * records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the
+     * number of requests sent but would add up to 5ms of latency to records sent in the absense of load.
      */
-    public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
-        this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+    public void setLingerMs(Integer lingerMs) {
+        this.lingerMs = lingerMs;
     }
 
-    public Integer getBatchNumMessages() {
-        return batchNumMessages;
+    public Integer getMaxBlockMs() {
+        return maxBlockMs;
     }
 
     /**
-     * The number of messages to send in one batch when using async mode.
-     * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
+     * The configuration controls how long sending to kafka will block. These methods can be
+     * blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent
+     * in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of
+     * partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata
      */
-    public void setBatchNumMessages(Integer batchNumMessages) {
-        this.batchNumMessages = batchNumMessages;
+    public void setMaxBlockMs(Integer maxBlockMs) {
+        this.maxBlockMs = maxBlockMs;
     }
 
-    public String getSerializerClass() {
-        return serializerClass;
+    public Integer getMaxRequestSize() {
+        return maxRequestSize;
     }
 
     /**
-     * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
-     * The default class is kafka.serializer.DefaultEncoder
+     * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size
+     * which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid
+     * sending huge requests.
      */
-    public void setSerializerClass(String serializerClass) {
-        this.serializerClass = serializerClass;
+    public void setMaxRequestSize(Integer maxRequestSize) {
+        this.maxRequestSize = maxRequestSize;
     }
 
-    public String getKeySerializerClass() {
-        return keySerializerClass;
+    public Integer getReceiveBufferBytes() {
+        return receiveBufferBytes;
     }
 
     /**
-     * The serializer class for keys (defaults to the same as for messages if nothing is given).
+     * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
      */
-    public void setKeySerializerClass(String keySerializerClass) {
-        this.keySerializerClass = keySerializerClass;
+    public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+        this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public String getOffsetsStorage() {
-        return offsetsStorage;
+    public Integer getTimeoutMs() {
+        return timeoutMs;
     }
 
     /**
-     * Select where offsets should be stored (zookeeper or kafka).
+     * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the
+     * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments
+     * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include
+     * the network latency of the request.
      */
-    public void setOffsetsStorage(String offsetsStorage) {
-        this.offsetsStorage = offsetsStorage;
+    public void setTimeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
     }
 
-    public Boolean isDualCommitEnabled() {
-        return dualCommitEnabled;
+    public Integer getMaxInFlightRequest() {
+        return maxInFlightRequest;
     }
 
     /**
-     * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
-     * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
-     * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
-     * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+     * The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting
+     * is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
      */
-    public void setDualCommitEnabled(Boolean dualCommitEnabled) {
-        this.dualCommitEnabled = dualCommitEnabled;
+    public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+        this.maxInFlightRequest = maxInFlightRequest;
+    }
+
+    public Integer getMetadataFetchTimeoutMs() {
+        return metadataFetchTimeoutMs;
     }
 
     /**
-     * Returns a copy of this configuration
+     * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions.
+     * This fetch to succeed before throwing an exception back to the client.
      */
-    public KafkaConfiguration copy() {
-        try {
-            return (KafkaConfiguration)clone();
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeCamelException(e);
-        }
+    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
+    }
+
+    public Integer getMetadataMaxAgeMs() {
+        return metadataMaxAgeMs;
+    }
+
+    /**
+     * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership
+     * changes to proactively discover any new brokers or partitions.
+     */
+    public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+        this.metadataMaxAgeMs = metadataMaxAgeMs;
+    }
+
+    public String getMetricReporters() {
+        return metricReporters;
+    }
+
+    /**
+     * A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be
+     * notified of new metric creation. The JmxReporter is always included to register JMX statistics.
+     */
+    public void setMetricReporters(String metricReporters) {
+        this.metricReporters = metricReporters;
+    }
+
+    public Integer getNoOfMetricsSample() {
+        return noOfMetricsSample;
+    }
+
+    /**
+     * The number of samples maintained to compute metrics.
+     */
+    public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+        this.noOfMetricsSample = noOfMetricsSample;
+    }
+
+    public Integer getMetricsSampleWindowMs() {
+        return metricsSampleWindowMs;
+    }
+
+    /**
+     * The number of samples maintained to compute metrics.
+     */
+    public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+        this.metricsSampleWindowMs = metricsSampleWindowMs;
+    }
+
+    public Integer getReconnectBackoffMs() {
+        return reconnectBackoffMs;
+    }
+
+    /**
+     * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host
+     * in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+     */
+    public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+        this.reconnectBackoffMs = reconnectBackoffMs;
+    }
+
+    public Integer getHeartbeatIntervalMs() {
+        return heartbeatIntervalMs;
+    }
+
+    /**
+     * The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.
+     * Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new
+     * consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set
+     * no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
+     */
+    public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+        this.heartbeatIntervalMs = heartbeatIntervalMs;
+    }
+
+    public Integer getMaxPartitionFetchBytes() {
+        return maxPartitionFetchBytes;
+    }
+
+    /**
+     * The maximum amount of data per-partition the server will return. The maximum total memory used for
+     * a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the
+     * maximum message size the server allows or else it is possible for the producer to send messages larger
+     * than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message
+     * on a certain partition.
+     */
+    public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+        this.maxPartitionFetchBytes = maxPartitionFetchBytes;
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * The timeout used to detect failures when using Kafka's group management facilities.
+     */
+    public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+        this.sessionTimeoutMs = sessionTimeoutMs;
+    }
+
+    public String getPartitionAssignor() {
+        return partitionAssignor;
+    }
+
+    /**
+     * The class name of the partition assignment strategy that the client will use to distribute
+     * partition ownership amongst consumer instances when group management is used
+     */
+    public void setPartitionAssignor(String partitionAssignor) {
+        this.partitionAssignor = partitionAssignor;
+    }
+
+    public Integer getConsumerRequestTimeoutMs() {
+        return consumerRequestTimeoutMs;
+    }
+
+    /**
+     * The configuration controls the maximum amount of time the client will wait for the response
+     * of a request. If the response is not received before the timeout elapses the client will resend
+     * the request if necessary or fail the request if retries are exhausted.
+     */
+    public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+        this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
+    }
+
+    public Boolean getCheckCrcs() {
+        return checkCrcs;
+    }
+
+    /**
+     * Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
+     * corruption to the messages occurred. This check adds some overhead, so it may be disabled in
+     * cases seeking extreme performance.
+     */
+    public void setCheckCrcs(Boolean checkCrcs) {
+        this.checkCrcs = checkCrcs;
+    }
+
+    public String getKeyDeserializer() {
+        return keyDeserializer;
+    }
+
+    /**
+     * Deserializer class for key that implements the Deserializer interface.
+     */
+    public void setKeyDeserializer(String keyDeserializer) {
+        this.keyDeserializer = keyDeserializer;
+    }
+
+    public String getValueDeserializer() {
+        return valueDeserializer;
+    }
+
+    /**
+     * Deserializer class for value that implements the Deserializer interface.
+     */
+    public void setValueDeserializer(String valueDeserializer) {
+        this.valueDeserializer = valueDeserializer;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index d3ff482..db99a09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -16,21 +16,20 @@
  */
 package org.apache.camel.component.kafka;
 
-/**
- *
- */
 public final class KafkaConstants {
 
-    public static final String DEFAULT_GROUP = "group1";
-
     public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
-    public static final String PARTITION = "kafka.PARTITION";
-    public static final String KEY = "kafka.KEY";
+    public static final String PARTITION = "kafka.EXCHANGE_NAME";
+    public static final String KEY = "kafka.CONTENT_TYPE";
     public static final String TOPIC = "kafka.TOPIC";
     public static final String OFFSET = "kafka.OFFSET";
 
     public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
     public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+    public static final String KAFKA_DEFAULT_SERIALIZER  = "org.apache.kafka.common.serialization.StringSerializer";
+    public static final String KAFKA_DEFAULT_DESERIALIZER  = "org.apache.kafka.common.serialization.StringDeserializer";
+    public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+    public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor";
 
     private KafkaConstants() {
         // Utility class


[2/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.

Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 94893fb..e65ed3b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,30 +16,19 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Arrays;
 import java.util.Properties;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class KafkaConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -47,15 +36,14 @@ public class KafkaConsumer extends DefaultConsumer {
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
-    private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;
-
+    
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
-        this.consumerBarriers = new HashMap<ConsumerConnector, CyclicBarrier>();
-        if (endpoint.getZookeeperConnect() == null) {
-            throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
+
+        if (endpoint.getBrokers() == null) {
+            throw new IllegalArgumentException("BootStrap servers must be specified");
         }
         if (endpoint.getGroupId() == null) {
             throw new IllegalArgumentException("groupId must not be null");
@@ -64,57 +52,25 @@ public class KafkaConsumer extends DefaultConsumer {
 
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
-        props.put("zookeeper.connect", endpoint.getZookeeperConnect());
-        props.put("group.id", endpoint.getGroupId());
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId());
         return props;
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        log.info("Starting Kafka consumer");
-
+        LOG.info("Starting Kafka consumer");
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
-            ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
-            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-            topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
-            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
-
-            // commit periodically
-            if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
-                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
-                        && endpoint.getConsumerStreams() > 1) {
-                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
-                }
-                CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
-                for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new BatchingConsumerTask(stream, barrier));
-                }
-                consumerBarriers.put(consumer, barrier);
-            } else {
-                // auto commit
-                for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new AutoCommitConsumerTask(consumer, stream));
-                }
-                consumerBarriers.put(consumer, null);
-            }
+            executor.submit(new KafkaFetchRecords(endpoint.getTopic(), i + "", getProps()));
         }
-
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        log.info("Stopping Kafka consumer");
-
-        for (ConsumerConnector consumer : consumerBarriers.keySet()) {
-            if (consumer != null) {
-                consumer.shutdown();
-            }
-        }
-        consumerBarriers.clear();
+        LOG.info("Stopping Kafka consumer");
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
@@ -126,102 +82,57 @@ public class KafkaConsumer extends DefaultConsumer {
         executor = null;
     }
 
-    class BatchingConsumerTask implements Runnable {
+    class KafkaFetchRecords implements Runnable {
 
-        private KafkaStream<byte[], byte[]> stream;
-        private CyclicBarrier barrier;
+        private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+        private final String topicName;
+        private final String threadId;
+        private final Properties kafkaProps;
 
-        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) {
-            this.stream = stream;
-            this.barrier = barrier;
+        public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+            this.topicName = topicName;
+            this.threadId = topicName + "-" + "Thread " + id;
+            this.kafkaProps = kafkaProps;
+            this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
         }
 
+        @Override
+        @SuppressWarnings("unchecked")
         public void run() {
-
             int processed = 0;
-            boolean consumerTimeout;
-            MessageAndMetadata<byte[], byte[]> mm;
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            boolean hasNext = true;
-            while (hasNext) {
-                try {
-                    consumerTimeout = false;
-                    // only poll the next message if we are allowed to run and are not suspending
-                    if (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
-                        mm = it.next();
-                        Exchange exchange = endpoint.createKafkaExchange(mm);
+            try {
+                LOG.debug("Subscribing {} to topic {}", threadId, topicName);
+                consumer.subscribe(Arrays.asList(topicName));
+                while (isRunAllowed() && !isSuspendingOrSuspended()) {
+                    ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
+                    for (ConsumerRecord<Object, Object> record : records) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
+                        }
+                        Exchange exchange = endpoint.createKafkaExchange(record);
                         try {
                             processor.process(exchange);
                         } catch (Exception e) {
-                            LOG.error(e.getMessage(), e);
+                            getExceptionHandler().handleException("Error during processing", exchange, e);
                         }
                         processed++;
-                    } else {
-                        // we don't need to process the message
-                        hasNext = false;
-                    }
-                } catch (ConsumerTimeoutException e) {
-                    LOG.debug("Consumer timeout occurred due " + e.getMessage(), e);
-                    consumerTimeout = true;
-                }
-
-                if (processed >= endpoint.getBatchSize() || consumerTimeout 
-                    || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round
-                    try {
-                        barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
-                        if (!consumerTimeout) {
-                            processed = 0;
+                        // if autocommit is false
+                        if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
+                            if (processed >= endpoint.getBatchSize()) {
+                                consumer.commitSync();
+                                processed = 0;
+                            }
                         }
-                    } catch (Exception e) {
-                        getExceptionHandler().handleException("Error waiting for batch to complete", e);
-                        break;
                     }
                 }
+                LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
+                consumer.unsubscribe();
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
             }
         }
-    }
-
-    class CommitOffsetTask implements Runnable {
-
-        private final ConsumerConnector consumer;
-
-        public CommitOffsetTask(ConsumerConnector consumer) {
-            this.consumer = consumer;
-        }
 
-        @Override
-        public void run() {
-            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
-            consumer.commitOffsets();
-        }
     }
 
-    class AutoCommitConsumerTask implements Runnable {
-
-        private final ConsumerConnector consumer;
-        private KafkaStream<byte[], byte[]> stream;
-
-        public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) {
-            this.consumer = consumer;
-            this.stream = stream;
-        }
-
-        public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            // only poll the next message if we are allowed to run and are not suspending
-            while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) {
-                MessageAndMetadata<byte[], byte[]> mm = it.next();
-                Exchange exchange = endpoint.createKafkaExchange(mm);
-                try {
-                    processor.process(exchange);
-                } catch (Exception e) {
-                    getExceptionHandler().handleException("Error during processing", exchange, e);
-                }
-            }
-            // no more data so commit offset
-            LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer));
-            consumer.commitOffsets();
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 760ec19a..5a56c39 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -16,10 +16,9 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
-import kafka.message.MessageAndMetadata;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -29,6 +28,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers.
@@ -38,8 +38,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-
-    @UriParam(description = "If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.", defaultValue = "false")
+    @UriParam
     private boolean bridgeEndpoint;
 
     public KafkaEndpoint() {
@@ -73,30 +72,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @Override
     public Producer createProducer() throws Exception {
-        String msgClassName = getConfiguration().getSerializerClass();
-        String keyClassName = getConfiguration().getKeySerializerClass();
-        if (msgClassName == null) {
-            msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER;
-        }
-        if (keyClassName == null) {
-            keyClassName = msgClassName;
-        }
-
-        ClassLoader cl = getClass().getClassLoader();
-
-        Class<?> k;
-        try {
-            k = cl.loadClass(keyClassName);
-        } catch (ClassNotFoundException x) {
-            k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName);
-        }
-        Class<?> v;
-        try {
-            v = cl.loadClass(msgClassName);
-        } catch (ClassNotFoundException x) {
-            v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName);
-        }
-        return createProducer(k, v, this);
+        return createProducer(this);
     }
 
     @Override
@@ -104,411 +80,597 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return true;
     }
 
+    @Override
+    public boolean isMultipleConsumersSupported() {
+        return true;
+    }
+
     public ExecutorService createExecutor() {
         return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
     }
 
-    public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
+    public Exchange createKafkaExchange(ConsumerRecord record) {
         Exchange exchange = super.createExchange();
 
         Message message = exchange.getIn();
-        message.setHeader(KafkaConstants.PARTITION, mm.partition());
-        message.setHeader(KafkaConstants.TOPIC, mm.topic());
-        message.setHeader(KafkaConstants.OFFSET, mm.offset());
-        if (mm.key() != null) {
-            message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setHeader(KafkaConstants.PARTITION, record.partition());
+        message.setHeader(KafkaConstants.TOPIC, record.topic());
+        message.setHeader(KafkaConstants.OFFSET, record.offset());
+        if (record.key() != null) {
+            message.setHeader(KafkaConstants.KEY, record.key());
         }
-        message.setBody(mm.message());
+        message.setBody(record.value());
 
         return exchange;
     }
 
-    protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) {
-        return new KafkaProducer<K, V>(endpoint);
+    protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
+        return new KafkaProducer(endpoint);
     }
 
     // Delegated properties from the configuration
     //-------------------------------------------------------------------------
 
-    public String getZookeeperConnect() {
-        return configuration.getZookeeperConnect();
+    public Properties createProducerProperties() {
+        return configuration.createProducerProperties();
+    }
+
+    public void setValueDeserializer(String valueDeserializer) {
+        configuration.setValueDeserializer(valueDeserializer);
     }
 
-    public void setZookeeperConnect(String zookeeperConnect) {
-        configuration.setZookeeperConnect(zookeeperConnect);
+    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+        configuration.setRequestTimeoutMs(requestTimeoutMs);
     }
 
-    public String getZookeeperHost() {
-        return configuration.getZookeeperHost();
+    public void setProducerBatchSize(Integer producerBatchSize) {
+        configuration.setProducerBatchSize(producerBatchSize);
     }
 
-    public void setZookeeperHost(String zookeeperHost) {
-        configuration.setZookeeperHost(zookeeperHost);
+    public void setRetryBackoffMs(Integer retryBackoffMs) {
+        configuration.setRetryBackoffMs(retryBackoffMs);
     }
 
-    public int getZookeeperPort() {
-        return configuration.getZookeeperPort();
+    public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+        configuration.setNoOfMetricsSample(noOfMetricsSample);
     }
 
-    public void setZookeeperPort(int zookeeperPort) {
-        configuration.setZookeeperPort(zookeeperPort);
+    public String getMetricReporters() {
+        return configuration.getMetricReporters();
     }
 
-    public String getGroupId() {
-        return configuration.getGroupId();
+    public void setSslKeystoreType(String sslKeystoreType) {
+        configuration.setSslKeystoreType(sslKeystoreType);
+    }
+
+    public void setSslCipherSuites(String sslCipherSuites) {
+        configuration.setSslCipherSuites(sslCipherSuites);
+    }
+
+    public void setClientId(String clientId) {
+        configuration.setClientId(clientId);
+    }
+
+    public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+        configuration.setMetricsSampleWindowMs(metricsSampleWindowMs);
+    }
+
+    public String getKeyDeserializer() {
+        return configuration.getKeyDeserializer();
+    }
+
+    public int getConsumersCount() {
+        return configuration.getConsumersCount();
+    }
+
+    public String getSslKeyPassword() {
+        return configuration.getSslKeyPassword();
+    }
+
+    public void setSendBufferBytes(Integer sendBufferBytes) {
+        configuration.setSendBufferBytes(sendBufferBytes);
+    }
+
+    public Boolean isAutoCommitEnable() {
+        return configuration.isAutoCommitEnable();
+    }
+
+    public Integer getMaxBlockMs() {
+        return configuration.getMaxBlockMs();
+    }
+
+    public String getConsumerId() {
+        return configuration.getConsumerId();
+    }
+
+    public void setSslProtocol(String sslProtocol) {
+        configuration.setSslProtocol(sslProtocol);
+    }
+
+    public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+        configuration.setReceiveBufferBytes(receiveBufferBytes);
+    }
+
+    public Boolean getCheckCrcs() {
+        return configuration.getCheckCrcs();
     }
 
     public void setGroupId(String groupId) {
         configuration.setGroupId(groupId);
     }
 
-    public String getPartitioner() {
-        return configuration.getPartitioner();
+    public String getCompressionCodec() {
+        return configuration.getCompressionCodec();
     }
 
-    public void setPartitioner(String partitioner) {
-        configuration.setPartitioner(partitioner);
+    public String getGroupId() {
+        return configuration.getGroupId();
     }
 
-    public String getTopic() {
-        return configuration.getTopic();
+    public void setSslTruststoreLocation(String sslTruststoreLocation) {
+        configuration.setSslTruststoreLocation(sslTruststoreLocation);
     }
 
-    public void setTopic(String topic) {
-        configuration.setTopic(topic);
+    public String getKerberosInitCmd() {
+        return configuration.getKerberosInitCmd();
     }
 
-    public String getBrokers() {
-        return configuration.getBrokers();
+    public String getAutoOffsetReset() {
+        return configuration.getAutoOffsetReset();
+    }
+
+    public void setAutoCommitEnable(Boolean autoCommitEnable) {
+        configuration.setAutoCommitEnable(autoCommitEnable);
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        configuration.setSerializerClass(serializerClass);
+    }
+
+    public Integer getQueueBufferingMaxMessages() {
+        return configuration.getQueueBufferingMaxMessages();
+    }
+
+    public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+        configuration.setSslEndpointAlgorithm(sslEndpointAlgorithm);
+    }
+
+    public void setRetries(Integer retries) {
+        configuration.setRetries(retries);
+    }
+
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        configuration.setAutoOffsetReset(autoOffsetReset);
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return configuration.getSessionTimeoutMs();
+    }
+
+    public Integer getBufferMemorySize() {
+        return configuration.getBufferMemorySize();
+    }
+
+    public String getKeySerializerClass() {
+        return configuration.getKeySerializerClass();
+    }
+
+    public void setSslProvider(String sslProvider) {
+        configuration.setSslProvider(sslProvider);
+    }
+
+    public void setFetchMinBytes(Integer fetchMinBytes) {
+        configuration.setFetchMinBytes(fetchMinBytes);
+    }
+
+    public Integer getAutoCommitIntervalMs() {
+        return configuration.getAutoCommitIntervalMs();
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        configuration.setKeySerializerClass(keySerializerClass);
+    }
+
+    public Integer getConnectionMaxIdleMs() {
+        return configuration.getConnectionMaxIdleMs();
+    }
+
+    public Integer getReceiveBufferBytes() {
+        return configuration.getReceiveBufferBytes();
     }
 
     public void setBrokers(String brokers) {
         configuration.setBrokers(brokers);
     }
 
-    public int getConsumerStreams() {
-        return configuration.getConsumerStreams();
+    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
     }
 
-    public void setConsumerStreams(int consumerStreams) {
-        configuration.setConsumerStreams(consumerStreams);
+    public String getValueDeserializer() {
+        return configuration.getValueDeserializer();
     }
 
-    public int getBatchSize() {
-        return configuration.getBatchSize();
+    public String getPartitioner() {
+        return configuration.getPartitioner();
     }
 
-    public void setBatchSize(int batchSize) {
-        this.configuration.setBatchSize(batchSize);
+    public String getSslTruststoreLocation() {
+        return configuration.getSslTruststoreLocation();
+    }
+
+    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+        configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+    }
+
+    public String getSslProvider() {
+        return configuration.getSslProvider();
+    }
+
+    public void setMetricReporters(String metricReporters) {
+        configuration.setMetricReporters(metricReporters);
+    }
+
+    public void setSslTruststorePassword(String sslTruststorePassword) {
+        configuration.setSslTruststorePassword(sslTruststorePassword);
+    }
+
+    public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+        configuration.setMaxInFlightRequest(maxInFlightRequest);
+    }
+
+    public String getTopic() {
+        return configuration.getTopic();
     }
 
     public int getBarrierAwaitTimeoutMs() {
         return configuration.getBarrierAwaitTimeoutMs();
     }
 
-    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
-        this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+    public Integer getFetchMinBytes() {
+        return configuration.getFetchMinBytes();
     }
 
-    public int getConsumersCount() {
-        return this.configuration.getConsumersCount();
+    public Integer getHeartbeatIntervalMs() {
+        return configuration.getHeartbeatIntervalMs();
     }
 
-    public void setConsumersCount(int consumersCount) {
-        this.configuration.setConsumersCount(consumersCount);
+    public void setKeyDeserializer(String keyDeserializer) {
+        configuration.setKeyDeserializer(keyDeserializer);
     }
 
-    public void setConsumerTimeoutMs(int consumerTimeoutMs) {
-        configuration.setConsumerTimeoutMs(consumerTimeoutMs);
+    public Integer getMaxRequestSize() {
+        return configuration.getMaxRequestSize();
     }
 
-    public void setSerializerClass(String serializerClass) {
-        configuration.setSerializerClass(serializerClass);
+    public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+        configuration.setMetadataMaxAgeMs(metadataMaxAgeMs);
     }
 
-    public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) {
-        configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
+    public String getSslKeystoreType() {
+        return configuration.getSslKeystoreType();
     }
 
-    public int getFetchWaitMaxMs() {
-        return configuration.getFetchWaitMaxMs();
+    public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+        configuration.setKerberosRenewWindowFactor(kerberosRenewWindowFactor);
     }
 
-    public Integer getZookeeperConnectionTimeoutMs() {
-        return configuration.getZookeeperConnectionTimeoutMs();
+    public Integer getKerberosBeforeReloginMinTime() {
+        return configuration.getKerberosBeforeReloginMinTime();
     }
 
-    public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
-        configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs);
+    public String getSslEnabledProtocols() {
+        return configuration.getSslEnabledProtocols();
     }
 
-    public void setMessageSendMaxRetries(int messageSendMaxRetries) {
-        configuration.setMessageSendMaxRetries(messageSendMaxRetries);
+    public Integer getMaxInFlightRequest() {
+        return configuration.getMaxInFlightRequest();
     }
 
-    public int getQueueBufferingMaxMs() {
-        return configuration.getQueueBufferingMaxMs();
+    public Integer getProducerBatchSize() {
+        return configuration.getProducerBatchSize();
     }
 
-    public void setRequestRequiredAcks(short requestRequiredAcks) {
-        configuration.setRequestRequiredAcks(requestRequiredAcks);
+    public void setSslKeystorePassword(String sslKeystorePassword) {
+        configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Integer getRebalanceBackoffMs() {
-        return configuration.getRebalanceBackoffMs();
+    public Boolean getBlockOnBufferFull() {
+        return configuration.getBlockOnBufferFull();
     }
 
-    public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) {
-        configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs);
+    public void setCheckCrcs(Boolean checkCrcs) {
+        configuration.setCheckCrcs(checkCrcs);
     }
 
-    public int getFetchMessageMaxBytes() {
-        return configuration.getFetchMessageMaxBytes();
+    public int getConsumerStreams() {
+        return configuration.getConsumerStreams();
     }
 
-    public int getQueuedMaxMessages() {
-        return configuration.getQueuedMaxMessageChunks();
+    public void setConsumersCount(int consumersCount) {
+        configuration.setConsumersCount(consumersCount);
     }
 
-    public int getAutoCommitIntervalMs() {
-        return configuration.getAutoCommitIntervalMs();
+    public int getBatchSize() {
+        return configuration.getBatchSize();
     }
 
-    public void setSocketTimeoutMs(int socketTimeoutMs) {
-        configuration.setSocketTimeoutMs(socketTimeoutMs);
+    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+        configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
     }
 
-    public void setAutoCommitIntervalMs(int autoCommitIntervalMs) {
-        configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
+    public void setSslTruststoreType(String sslTruststoreType) {
+        configuration.setSslTruststoreType(sslTruststoreType);
     }
 
-    public void setRequestTimeoutMs(int requestTimeoutMs) {
-        configuration.setRequestTimeoutMs(requestTimeoutMs);
+    public Integer getConsumerRequestTimeoutMs() {
+        return configuration.getConsumerRequestTimeoutMs();
     }
 
-    public void setCompressedTopics(String compressedTopics) {
-        configuration.setCompressedTopics(compressedTopics);
+    public String getSslKeystorePassword() {
+        return configuration.getSslKeystorePassword();
     }
 
-    public int getSocketReceiveBufferBytes() {
-        return configuration.getSocketReceiveBufferBytes();
+    public void setSslKeyPassword(String sslKeyPassword) {
+        configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setSendBufferBytes(int sendBufferBytes) {
-        configuration.setSendBufferBytes(sendBufferBytes);
+    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+        configuration.setBlockOnBufferFull(blockOnBufferFull);
     }
 
-    public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) {
-        configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes);
+    public Integer getRequestRequiredAcks() {
+        return configuration.getRequestRequiredAcks();
     }
 
-    public int getRefreshLeaderBackoffMs() {
-        return configuration.getRefreshLeaderBackoffMs();
+    public Double getKerberosRenewWindowFactor() {
+        return configuration.getKerberosRenewWindowFactor();
     }
 
-    public void setFetchWaitMaxMs(int fetchWaitMaxMs) {
-        configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
+    public void setKerberosInitCmd(String kerberosInitCmd) {
+        configuration.setKerberosInitCmd(kerberosInitCmd);
     }
 
-    public int getTopicMetadataRefreshIntervalMs() {
-        return configuration.getTopicMetadataRefreshIntervalMs();
+    public Integer getRetryBackoffMs() {
+        return configuration.getRetryBackoffMs();
     }
 
-    public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
-        configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
+    public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+        configuration.setSslTrustmanagerAlgorithm(sslTrustmanagerAlgorithm);
     }
 
-    public Integer getConsumerTimeoutMs() {
-        return configuration.getConsumerTimeoutMs();
+    public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+        configuration.setConsumerRequestTimeoutMs(consumerRequestTimeoutMs);
     }
 
-    public void setAutoCommitEnable(boolean autoCommitEnable) {
-        configuration.setAutoCommitEnable(autoCommitEnable);
+    public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+        configuration.setReconnectBackoffMs(reconnectBackoffMs);
     }
 
-    public String getCompressionCodec() {
-        return configuration.getCompressionCodec();
+    public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+        configuration.setKerberosRenewJitter(kerberosRenewJitter);
+    }
+
+    public String getSslKeystoreLocation() {
+        return configuration.getSslKeystoreLocation();
     }
 
-    public void setProducerType(String producerType) {
-        configuration.setProducerType(producerType);
+    public Integer getNoOfMetricsSample() {
+        return configuration.getNoOfMetricsSample();
+    }
+
+    public String getSslKeymanagerAlgorithm() {
+        return configuration.getSslKeymanagerAlgorithm();
+    }
+
+    public void setConsumerId(String consumerId) {
+        configuration.setConsumerId(consumerId);
     }
 
     public String getClientId() {
         return configuration.getClientId();
     }
 
-    public int getFetchMinBytes() {
-        return configuration.getFetchMinBytes();
+    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+        configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
     }
 
-    public String getAutoOffsetReset() {
-        return configuration.getAutoOffsetReset();
+    public String getSslCipherSuites() {
+        return configuration.getSslCipherSuites();
     }
 
-    public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) {
-        configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs);
+    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+        configuration.setRequestRequiredAcks(requestRequiredAcks);
     }
 
-    public void setAutoOffsetReset(String autoOffsetReset) {
-        configuration.setAutoOffsetReset(autoOffsetReset);
+    public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+        configuration.setConnectionMaxIdleMs(connectionMaxIdleMs);
     }
 
-    public void setConsumerId(String consumerId) {
-        configuration.setConsumerId(consumerId);
+    public String getSslTrustmanagerAlgorithm() {
+        return configuration.getSslTrustmanagerAlgorithm();
     }
 
-    public int getRetryBackoffMs() {
-        return configuration.getRetryBackoffMs();
+    public String getSslTruststorePassword() {
+        return configuration.getSslTruststorePassword();
     }
 
-    public int getRebalanceMaxRetries() {
-        return configuration.getRebalanceMaxRetries();
+    public void setTimeoutMs(Integer timeoutMs) {
+        configuration.setTimeoutMs(timeoutMs);
     }
 
-    public Boolean isAutoCommitEnable() {
-        return configuration.isAutoCommitEnable();
+    public void setConsumerStreams(int consumerStreams) {
+        configuration.setConsumerStreams(consumerStreams);
     }
 
-    public void setQueueBufferingMaxMs(int queueBufferingMaxMs) {
-        configuration.setQueueBufferingMaxMs(queueBufferingMaxMs);
+    public String getSslTruststoreType() {
+        return configuration.getSslTruststoreType();
     }
 
-    public void setRebalanceMaxRetries(int rebalanceMaxRetries) {
-        configuration.setRebalanceMaxRetries(rebalanceMaxRetries);
+    public String getSecurityProtocol() {
+        return configuration.getSecurityProtocol();
     }
 
-    public int getZookeeperSessionTimeoutMs() {
-        return configuration.getZookeeperSessionTimeoutMs();
+    public void setBufferMemorySize(Integer bufferMemorySize) {
+        configuration.setBufferMemorySize(bufferMemorySize);
     }
 
-    public void setKeySerializerClass(String keySerializerClass) {
-        configuration.setKeySerializerClass(keySerializerClass);
+    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+        configuration.setSaslKerberosServiceName(saslKerberosServiceName);
     }
 
     public void setCompressionCodec(String compressionCodec) {
         configuration.setCompressionCodec(compressionCodec);
     }
 
-    public void setClientId(String clientId) {
-        configuration.setClientId(clientId);
+    public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+        configuration.setKerberosBeforeReloginMinTime(kerberosBeforeReloginMinTime);
     }
 
-    public int getSocketTimeoutMs() {
-        return configuration.getSocketTimeoutMs();
+    public Integer getMetadataMaxAgeMs() {
+        return configuration.getMetadataMaxAgeMs();
     }
 
-    public String getCompressedTopics() {
-        return configuration.getCompressedTopics();
+    public String getSerializerClass() {
+        return configuration.getSerializerClass();
     }
 
-    public int getZookeeperSyncTimeMs() {
-        return configuration.getZookeeperSyncTimeMs();
+    public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+        configuration.setSslKeymanagerAlgorithm(sslKeymanagerAlgorithm);
     }
 
-    public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) {
-        configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes);
+    public void setMaxRequestSize(Integer maxRequestSize) {
+        configuration.setMaxRequestSize(maxRequestSize);
     }
 
-    public int getQueueEnqueueTimeoutMs() {
-        return configuration.getQueueEnqueueTimeoutMs();
+    public Double getKerberosRenewJitter() {
+        return configuration.getKerberosRenewJitter();
     }
 
-    public int getQueueBufferingMaxMessages() {
-        return configuration.getQueueBufferingMaxMessages();
+    public String getPartitionAssignor() {
+        return configuration.getPartitionAssignor();
     }
 
-    public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) {
-        configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs);
+    public Integer getMetadataFetchTimeoutMs() {
+        return configuration.getMetadataFetchTimeoutMs();
     }
 
-    public String getKeySerializerClass() {
-        return configuration.getKeySerializerClass();
+    public void setSecurityProtocol(String securityProtocol) {
+        configuration.setSecurityProtocol(securityProtocol);
     }
 
-    public void setTopicMetadataRefreshIntervalMs(int topicMetadataRefreshIntervalMs) {
-        configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs);
+    public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+        configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
     }
 
-    public void setBatchNumMessages(int batchNumMessages) {
-        configuration.setBatchNumMessages(batchNumMessages);
+    public String getSaslKerberosServiceName() {
+        return configuration.getSaslKerberosServiceName();
     }
 
-    public int getSendBufferBytes() {
-        return configuration.getSendBufferBytes();
+    public void setBatchSize(int batchSize) {
+        configuration.setBatchSize(batchSize);
     }
 
-    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
-        configuration.setRebalanceBackoffMs(rebalanceBackoffMs);
+    public Integer getLingerMs() {
+        return configuration.getLingerMs();
     }
 
-    public void setQueuedMaxMessages(int queuedMaxMessages) {
-        configuration.setQueuedMaxMessageChunks(queuedMaxMessages);
+    public Integer getRetries() {
+        return configuration.getRetries();
     }
 
-    public void setRetryBackoffMs(int retryBackoffMs) {
-        configuration.setRetryBackoffMs(retryBackoffMs);
+    public Integer getMaxPartitionFetchBytes() {
+        return configuration.getMaxPartitionFetchBytes();
     }
 
-    public int getBatchNumMessages() {
-        return configuration.getBatchNumMessages();
+    public String getSslEndpointAlgorithm() {
+        return configuration.getSslEndpointAlgorithm();
     }
 
-    public short getRequestRequiredAcks() {
-        return configuration.getRequestRequiredAcks();
+    public Integer getReconnectBackoffMs() {
+        return configuration.getReconnectBackoffMs();
     }
 
-    public String getProducerType() {
-        return configuration.getProducerType();
+    public void setLingerMs(Integer lingerMs) {
+        configuration.setLingerMs(lingerMs);
     }
 
-    public String getConsumerId() {
-        return configuration.getConsumerId();
+    public void setPartitionAssignor(String partitionAssignor) {
+        configuration.setPartitionAssignor(partitionAssignor);
     }
 
-    public int getMessageSendMaxRetries() {
-        return configuration.getMessageSendMaxRetries();
+    public Integer getRequestTimeoutMs() {
+        return configuration.getRequestTimeoutMs();
     }
 
-    public void setFetchMinBytes(int fetchMinBytes) {
-        configuration.setFetchMinBytes(fetchMinBytes);
+    public Properties createConsumerProperties() {
+        return configuration.createConsumerProperties();
     }
 
-    public String getSerializerClass() {
-        return configuration.getSerializerClass();
+    public void setTopic(String topic) {
+        configuration.setTopic(topic);
     }
 
-    public int getRequestTimeoutMs() {
-        return configuration.getRequestTimeoutMs();
+    public Integer getFetchWaitMaxMs() {
+        return configuration.getFetchWaitMaxMs();
     }
 
-    @Override
-    public boolean isMultipleConsumersSupported() {
-        return true;
+    public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+        configuration.setSessionTimeoutMs(sessionTimeoutMs);
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
+    public void setSslEnabledProtocols(String sslEnabledProtocols) {
+        configuration.setSslEnabledProtocols(sslEnabledProtocols);
     }
 
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
+    public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+        configuration.setHeartbeatIntervalMs(heartbeatIntervalMs);
     }
 
-    public String getOffsetsStorage() {
-        return configuration.getOffsetsStorage();
+    public void setMaxBlockMs(Integer maxBlockMs) {
+        configuration.setMaxBlockMs(maxBlockMs);
     }
 
-    public void setOffsetsStorage(String offsetsStorage) {
-        configuration.setOffsetsStorage(offsetsStorage);
+    public void setSslKeystoreLocation(String sslKeystoreLocation) {
+        configuration.setSslKeystoreLocation(sslKeystoreLocation);
     }
 
-    public Boolean isDualCommitEnabled() {
-        return configuration.isDualCommitEnabled();
+    public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+        configuration.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
+    }
+
+    public void setPartitioner(String partitioner) {
+        configuration.setPartitioner(partitioner);
+    }
+
+    public String getBrokers() {
+        return configuration.getBrokers();
     }
 
-    public void setDualCommitEnabled(boolean dualCommitEnabled) {
-        configuration.setDualCommitEnabled(dualCommitEnabled);
+    public Integer getMetricsSampleWindowMs() {
+        return configuration.getMetricsSampleWindowMs();
     }
 
+    public Integer getSendBufferBytes() {
+        return configuration.getSendBufferBytes();
+    }
+
+    public Integer getTimeoutMs() {
+        return configuration.getTimeoutMs();
+    }
+
+    public String getSslProtocol() {
+        return configuration.getSslProtocol();
+    }
+
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
+     */
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 06a0317..6f9ea79 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,20 +18,16 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
-/**
- *
- */
-public class KafkaProducer<K, V> extends DefaultProducer {
+public class KafkaProducer extends DefaultProducer {
 
-    protected Producer<K, V> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
@@ -39,26 +35,38 @@ public class KafkaProducer<K, V> extends DefaultProducer {
         this.endpoint = endpoint;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        if (producer != null) {
-            producer.close();
-        }
-    }
-
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         if (endpoint.getBrokers() != null) {
-            props.put("metadata.broker.list", endpoint.getBrokers());
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
         }
         return props;
     }
 
+    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+        return kafkaProducer;
+    }
+
+    /**
+     * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
+     */
+    public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         Properties props = getProps();
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<K, V>(config);
+        if (kafkaProducer == null) {
+            kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
+        }
     }
 
     @Override
@@ -71,26 +79,26 @@ public class KafkaProducer<K, V> extends DefaultProducer {
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-        K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+        Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
         boolean hasPartitionKey = partitionKey != null;
 
-        K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
+        Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
         boolean hasMessageKey = messageKey != null;
 
-        V msg = (V) exchange.getIn().getBody();
-        KeyedMessage<K, V> data;
+        Object msg = exchange.getIn().getBody();
 
+        ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
-        } else if (hasPartitionKey) {
-            data = new KeyedMessage<K, V>(topic, partitionKey, msg);
+            record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg);
         } else if (hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, msg);
+            record = new ProducerRecord(topic, messageKey, msg);
         } else {
             log.warn("No message key or partition key set");
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
+            record = new ProducerRecord(topic, msg);
         }
-        producer.send(data);
+
+        // TODO: add support for async callback in the send
+        kafkaProducer.send(record);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 0d2b003..d87f885 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -30,26 +30,23 @@ import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
+
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;
-    
-    private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
 
     private static volatile int zookeeperPort;
-    
+
     private static volatile int karfkaPort;
-   
+
     @BeforeClass
     public static void beforeClass() {
         // start from somewhere in the 23xxx range
         zookeeperPort = AvailablePortFinder.getNextAvailable(23000);
         // find another ports for proxy route test
         karfkaPort = AvailablePortFinder.getNextAvailable(24000);
-        
+
         embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort);
         List<Integer> kafkaPorts = new ArrayList<Integer>();
         // -1 for any available port
@@ -60,9 +57,9 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         } catch (IOException e) {
             e.printStackTrace();
         }
-        LOG.info("Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
+        System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
         embeddedKafkaCluster.startup();
-        LOG.info("Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
+        System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
     }
 
     @AfterClass
@@ -70,7 +67,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         embeddedKafkaCluster.shutdown();
         embeddedZookeeper.shutdown();
     }
-    
+
     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
@@ -81,7 +78,6 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         jndi.bind("prop", prop);
         return jndi;
     }
-    
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -89,12 +85,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         context.addComponent("properties", new PropertiesComponent("ref:prop"));
         return context;
     }
-    
 
     protected static int getZookeeperPort() {
         return zookeeperPort;
     }
-    
+
     protected static int getKarfkaPort() {
         return karfkaPort;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index eb6dd09..31c2dd6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -18,14 +18,17 @@ package org.apache.camel.component.kafka;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.Registry;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class KafkaComponentTest {
 
@@ -34,10 +37,6 @@ public class KafkaComponentTest {
     @Test
     public void testPropertiesSet() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("zookeeperHost", "somehost");
-        params.put("zookeeperPort", 2987);
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
         params.put("topic", "mytopic");
         params.put("partitioner", "com.class.Party");
 
@@ -45,97 +44,166 @@ public class KafkaComponentTest {
         String remaining = "broker1:12345,broker2:12566";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
         assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
         assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
         assertEquals("com.class.Party", endpoint.getPartitioner());
     }
 
     @Test
-    public void testZookeeperConnectPropertyOverride() throws Exception {
+    public void testAllProducerConfigProperty() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("zookeeperConnect", "thehost:2181/chroot");
-        params.put("zookeeperHost", "somehost");
-        params.put("zookeeperPort", 2987);
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
+        setProducerProperty(params);
 
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+        String uri = "kafka:dev1:12345,dev2:12566";
+        String remaining = "dev1:12345,dev2:12566";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect());
-        assertNull(endpoint.getZookeeperHost());
-        assertEquals(-1, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
+
+        assertEquals(new Integer(0), endpoint.getRequestRequiredAcks());
+        assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+        assertEquals(new Integer(10), endpoint.getProducerBatchSize());
+        assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
+        assertEquals(new Integer(1), endpoint.getMaxBlockMs());
+        assertEquals(false, endpoint.getBlockOnBufferFull());
+        assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+        assertEquals("testing", endpoint.getClientId());
+        assertEquals("none", endpoint.getCompressionCodec());
+        assertEquals(new Integer(1), endpoint.getLingerMs());
+        assertEquals(new Integer(100), endpoint.getMaxRequestSize());
+        assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
+        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
+        assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
+        assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
+        assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
+        assertEquals(new Integer(0), endpoint.getRetries());
+        assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
+        assertEquals(765, endpoint.getSendBufferBytes().intValue());
+        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
+        assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
+        assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport", endpoint.getMetricReporters());
+        assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
+        assertEquals(new Integer(12344), endpoint.getMetricsSampleWindowMs());
+        assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getSerializerClass());
+        assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getKeySerializerClass());
+        assertEquals("testing", endpoint.getSslKeyPassword());
+        assertEquals("/abc", endpoint.getSslKeystoreLocation());
+        assertEquals("testing", endpoint.getSslKeystorePassword());
+        assertEquals("/abc", endpoint.getSslTruststoreLocation());
+        assertEquals("testing", endpoint.getSslTruststorePassword());
+        assertEquals("test", endpoint.getSaslKerberosServiceName());
+        assertEquals("PLAINTEXT", endpoint.getSecurityProtocol());
+        assertEquals("TLSv1.2", endpoint.getSslEnabledProtocols());
+        assertEquals("JKS", endpoint.getSslKeystoreType());
+        assertEquals("TLS", endpoint.getSslProtocol());
+        assertEquals("test", endpoint.getSslProvider());
+        assertEquals("JKS", endpoint.getSslTruststoreType());
+        assertEquals("/usr/bin/kinit", endpoint.getKerberosInitCmd());
+        assertEquals(new Integer(60000), endpoint.getKerberosBeforeReloginMinTime());
+        assertEquals(new Double(0.05), endpoint.getKerberosRenewJitter());
+        assertEquals(new Double(0.8), endpoint.getKerberosRenewWindowFactor());
+        assertEquals("MAC", endpoint.getSslCipherSuites());
+        assertEquals("test", endpoint.getSslEndpointAlgorithm());
+        assertEquals("SunX509", endpoint.getSslKeymanagerAlgorithm());
+        assertEquals("PKIX", endpoint.getSslTrustmanagerAlgorithm());
     }
 
     @Test
-    public void testPropertiesConfigrationMerge() throws Exception {
+    public void testAllProducerKeys() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
 
-        KafkaConfiguration kc = new KafkaConfiguration();
-        kc.setZookeeperHost("somehost");
-        kc.setZookeeperPort(2987);
-        kc.setTopic("default");
-        params.put("configuration", kc);
-
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+        String uri = "kafka:dev1:12345,dev2:12566";
+        String remaining = "dev1:12345,dev2:12566";
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
-        assertNull("dirty", kc.getBrokers());
-        assertEquals("default", kc.getTopic());
+        assertEquals(endpoint.getConfiguration().createProducerProperties().keySet(), getProducerKeys().keySet());
     }
 
-    @Test
-    public void testPropertiesConfigrationRefMerge() throws Exception {
-        Map<String, Object> params = new HashMap<String, Object>();
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
-
-        KafkaConfiguration kc = new KafkaConfiguration();
-        kc.setZookeeperHost("somehost");
-        kc.setZookeeperPort(2987);
-        kc.setTopic("default");
-        Registry registry = Mockito.mock(Registry.class);
-        Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc);
-        Mockito.when(context.getRegistry()).thenReturn(registry);
-        params.put("configuration", "#baseconf");
-
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+    private Properties getProducerKeys() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
+        props.put(ProducerConfig.RETRIES_CONFIG, "0");
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
+        props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000");
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576");
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
+        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
+        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
+        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
+        props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
+        props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
+        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50");
+        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
+        props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, TLSv1");
+        props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+        props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        props.put(SaslConfigs.SASL_KERBEROS_KINIT_CMD, "/usr/bin/kinit");
+        props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000");
+        props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05");
+        props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8");
+        props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509");
+        props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX");
+
+        return props;
+    }
 
-        KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
-        assertNull("dirty", kc.getBrokers());
-        assertEquals("default", kc.getTopic());
+    private void setProducerProperty(Map<String, Object> params) {
+        params.put("requestRequiredAcks", 0);
+        params.put("bufferMemorySize", 1);
+        params.put("compressionCodec", "none");
+        params.put("retries", 0);
+        params.put("producerBatchSize", 10);
+        params.put("connectionMaxIdleMs", 12);
+        params.put("lingerMs", 1);
+        params.put("maxBlockMs", 1);
+        params.put("maxRequestSize", 100);
+        params.put("receiveBufferBytes", 23);
+        params.put("requestTimeoutMs", 100);
+        params.put("sendBufferBytes", 765);
+        params.put("timeoutMs", 2045);
+        params.put("blockOnBufferFull", false);
+        params.put("maxInFlightRequest", 1);
+        params.put("metadataFetchTimeoutMs", 9043);
+        params.put("metadataMaxAgeMs", 1029);
+        params.put("reconnectBackoffMs", 234);
+        params.put("retryBackoffMs", 3782);
+        params.put("noOfMetricsSample", 3);
+        params.put("metricReporters", "org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport");
+        params.put("metricsSampleWindowMs", 12344);
+        params.put("clientId", "testing");
+        params.put("sslKeyPassword", "testing");
+        params.put("sslKeystoreLocation", "/abc");
+        params.put("sslKeystorePassword", "testing");
+        params.put("sslTruststoreLocation", "/abc");
+        params.put("sslTruststorePassword", "testing");
+        params.put("saslKerberosServiceName", "test");
+        params.put("securityProtocol", "PLAINTEXT");
+        params.put("sslEnabledProtocols", "TLSv1.2");
+        params.put("sslKeystoreType", "JKS");
+        params.put("sslProtocol", "TLS");
+        params.put("sslProvider", "test");
+        params.put("sslTruststoreType", "JKS");
+        params.put("kerberosInitCmd", "/usr/bin/kinit");
+        params.put("kerberosBeforeReloginMinTime", 60000);
+        params.put("kerberosRenewJitter", 0.05);
+        params.put("kerberosRenewWindowFactor", 0.8);
+        params.put("sslCipherSuites", "MAC");
+        params.put("sslEndpointAlgorithm", "test");
+        params.put("sslKeymanagerAlgorithm", "SunX509");
+        params.put("sslTrustmanagerAlgorithm", "PKIX");
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index a6f59c0..9128c62 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -18,13 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,39 +33,38 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
     public static final String TOPIC = "test";
 
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
-            + "&zookeeperHost=localhost"
-            + "&zookeeperPort={{zookeeperPort}}"
             + "&groupId=group1"
-            + "&autoOffsetReset=smallest"
+            + "&autoOffsetReset=earliest"
             + "&autoCommitEnable=false"
             + "&batchSize=3"
             + "&consumerStreams=10"
-            // If set the consumerTiemout too small the test will fail in JDK7
-            + "&consumerTimeoutMs=300"
             + "&barrierAwaitTimeoutMs=1000"
-        )
+    )
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    private Producer<String, String> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner");
-        props.put("request.required.acks", "1");
 
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<String, String>(config);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
     }
 
     @After
     public void after() {
-        producer.close();
+        if (producer != null) {
+            producer.close();
+        }
     }
 
     @Override
@@ -74,47 +72,39 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from(from).autoStartup(true).to(to).setId("First");
+                from(from).routeId("foo").to(to).setId("First");
             }
         };
     }
 
     @Test
     public void kafkaMessagesIsConsumedByCamel() throws Exception {
+
         //First 2 must not be committed since batch size is 3
         to.expectedBodiesReceivedInAnyOrder("m1", "m2");
         for (int k = 1; k <= 2; k++) {
             String msg = "m" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
             producer.send(data);
         }
-        to.assertIsSatisfied(3000);
-        
+        to.assertIsSatisfied();
+
         to.reset();
+
+        to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
+
         //Restart endpoint,
-        from.getCamelContext().stop();
-        from.getCamelContext().start();
-        
-        to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
+        context.stopRoute("foo");
+        context.startRoute("foo");
 
         //Second route must wake up and consume all from scratch and commit 9 consumed
         for (int k = 3; k <= 10; k++) {
             String msg = "m" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
             producer.send(data);
         }
 
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-        //Restart endpoint,
-        from.getCamelContext().stop();
-        from.getCamelContext().start();
-
-        
-        //Only one message should left to consume by this consumer group
-        to.expectedMessageCount(1);
-        to.assertIsSatisfied(3000);
+        to.assertIsSatisfied();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index c49444b..4f8a0fd 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -19,46 +19,49 @@ package org.apache.camel.component.kafka;
 import java.io.IOException;
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}"
-        + "&groupId=group1&autoOffsetReset=smallest")
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+            + "&groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+            + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    private Producer<String, String> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner");
-        props.put("request.required.acks", "1");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
 
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<String, String>(config);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
     }
 
     @After
     public void after() {
-        producer.close();
+        if (producer != null) {
+            producer.close();
+        }
     }
 
     @Override
@@ -78,10 +81,11 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
             producer.send(data);
         }
         to.assertIsSatisfied(3000);
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index b51c09e..86bc163 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -28,21 +28,21 @@ public class KafkaConsumerTest {
     private Processor processor = mock(Processor.class);
 
     @Test(expected = IllegalArgumentException.class)
-    public void consumerRequiresZookeeperConnect() throws Exception {
+    public void consumerRequiresBootstrapServers() throws Exception {
         Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
-        Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234");
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test
-    public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception {
+    public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
         Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
-        Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181");
         new KafkaConsumer(endpoint, processor);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index be16766..bd25886 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -18,11 +18,8 @@ package org.apache.camel.component.kafka;
 
 import java.net.URISyntaxException;
 
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
-
-import kafka.serializer.DefaultDecoder;
 import org.apache.camel.Exchange;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,12 +32,8 @@ public class KafkaEndpointTest {
         KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
         endpoint.setBrokers("localhost");
 
-        Message message = new Message("mymessage".getBytes(), "somekey".getBytes());
-        DefaultDecoder decoder = new DefaultDecoder(null);
-        MessageAndMetadata<byte[], byte[]> mm =
-                new MessageAndMetadata<byte[], byte[]>("topic", 4, message, 56, decoder, decoder);
-
-        Exchange exchange = endpoint.createKafkaExchange(mm);
+        ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 4, 56, "somekey", "");
+        Exchange exchange = endpoint.createKafkaExchange(record);
         assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY));
         assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC));
         assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));


[4/4] camel git commit: CAMEL-9467: Adjust feature for camel-kafka

Posted by da...@apache.org.
CAMEL-9467: Adjust feature for camel-kafka


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/621a704c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/621a704c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/621a704c

Branch: refs/heads/master
Commit: 621a704c455fbd2c3ae6e2ea2b21c0e7da5e5f23
Parents: b2aa831
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 10:09:38 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:09:38 2016 +0100

----------------------------------------------------------------------
 platforms/karaf/features/src/main/resources/features.xml | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/621a704c/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 0062335..5809e92 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -973,13 +973,9 @@
     <bundle>mvn:org.apache.camel/camel-jxpath/${project.version}</bundle>
   </feature>
   <feature name='camel-kafka' version='${project.version}' resolver='(obr)' start-level='50'>
-    <bundle dependency='true'>mvn:org.scala-lang/scala-library/${scala-2.10-version}</bundle>
-    <bundle dependency='true'>mvn:org.apache.zookeeper/zookeeper/${zookeeper-version}</bundle>
-    <bundle dependency="true">wrap:mvn:com.101tec/zkclient/${zkclient-version}$Bundle-Version=${zkclient-version}&amp;Export-Package=*;-noimport:=true;version="${zkclient-version}"</bundle>
-    <bundle dependency='true'>wrap:mvn:com.yammer.metrics/metrics-core/${yammer-metrics-version}$Bundle-Version=${yammer-metrics-version}&amp;Export-Package=*;-noimport:=true;version="${yammer-metrics-version}"</bundle>
-    <bundle dependency='true'>wrap:mvn:com.yammer.metrics/metrics-annotation/${yammer-metrics-version}$Bundle-Version=${yammer-metrics-version}&amp;Export-Package=*;-noimport:=true;version="${yammer-metrics-version}"</bundle>
-    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/${kafka-clients-bundle-version}</bundle>
-    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka_2.10/${kafka-bundle-version}</bundle>
+    <bundle dependency='true'>wrap:mvn:org.apache.kafka/kafka-clients/${kafka-version}</bundle>
+    <bundle dependency='true'>mvn:org.xerial.snappy/snappy-java/${snappy-version}</bundle>
+    <bundle dependency='true'>wrap:mvn:net.jpountz.lz4/lz4/1.2.0</bundle>
     <feature version='${project.version}'>camel-core</feature>
     <bundle>mvn:org.apache.camel/camel-kafka/${project.version}</bundle>
   </feature>