You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/16 17:45:51 UTC

camel git commit: CAMEL-8790: Kafka producer hard coded to use Strings. Thanks to Mark Mindenhall for the patch.

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 9e2cb5fd6 -> 6cb8770ee


CAMEL-8790: Kafka producer hard coded to use Strings. Thanks to Mark Mindenhall for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cb8770e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cb8770e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cb8770e

Branch: refs/heads/camel-2.15.x
Commit: 6cb8770ee1d330ca158e4d5b9bab6f231403e120
Parents: 9e2cb5f
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jul 16 17:50:21 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jul 16 17:52:29 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConstants.java   |   3 +
 .../camel/component/kafka/KafkaEndpoint.java    |  16 ++-
 .../camel/component/kafka/KafkaProducer.java    |  26 ++--
 .../component/kafka/KafkaProducerFullTest.java  | 135 +++++++++++++------
 4 files changed, 129 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 18f1da0..6c31b65 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -28,6 +28,9 @@ public final class KafkaConstants {
     public static final String KEY = "kafka.CONTENT_TYPE";
     public static final String TOPIC = "kafka.TOPIC";
 
+    public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
+    public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+
     private KafkaConstants() {
         // Utility class
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1ce325b..8dde7c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -76,7 +76,18 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KafkaProducer(this);
+        String msgClassName = getConfiguration().getSerializerClass();
+        String keyClassName = getConfiguration().getKeySerializerClass();
+        if (msgClassName == null) {
+            msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER;
+        }
+        if (keyClassName == null) {
+            keyClassName = msgClassName;
+        }
+
+        Class k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName);
+        Class v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName);
+        return createProducer(k, v, this);
     }
 
     @Override
@@ -103,6 +114,9 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return exchange;
     }
 
+    protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) {
+        return new KafkaProducer<K, V>(endpoint);
+    }
 
     // Delegated properties from the configuration
     //-------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index b832740..0fde1ae 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -29,9 +29,9 @@ import org.apache.camel.impl.DefaultProducer;
 /**
  *
  */
-public class KafkaProducer extends DefaultProducer {
+public class KafkaProducer<K, V> extends DefaultProducer {
 
-    protected Producer<String, String> producer;
+    protected Producer<K, V> producer;
     private final KafkaEndpoint endpoint;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
@@ -58,30 +58,34 @@ public class KafkaProducer extends DefaultProducer {
     protected void doStart() throws Exception {
         Properties props = getProps();
         ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<String, String>(config);
+        producer = new Producer<K, V>(config);
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws CamelException {
         String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-        String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class);
+        K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
         boolean hasPartitionKey = partitionKey != null;
-        String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class);
+
+        K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
         boolean hasMessageKey = messageKey != null;
-        String msg = exchange.getIn().getBody(String.class);
-        KeyedMessage<String, String> data;
+
+        V msg = (V) exchange.getIn().getBody();
+        KeyedMessage<K, V> data;
+
         if (hasPartitionKey && hasMessageKey) {
-            data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg);
+            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
         } else if (hasPartitionKey) {
-            data = new KeyedMessage<String, String>(topic, partitionKey, msg);
+            data = new KeyedMessage<K, V>(topic, partitionKey, msg);
         } else if (hasMessageKey) {
-            data = new KeyedMessage<String, String>(topic, messageKey, msg);
+            data = new KeyedMessage<K, V>(topic, messageKey, msg);
         } else {
             log.warn("No message key or partition key set");
-            data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg);
+            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
         }
         producer.send(data);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 56e0eb2..d76a059 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -36,97 +36,154 @@ import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     
-    public static final String TOPIC = "test";
-    public static final String TOPIC_IN_HEADER = "testHeader";
+    private static final String TOPIC_STRINGS = "test";
+    private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
+    private static final String TOPIC_BYTES = "testBytes";
+    private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
+    private static final String GROUP_STRINGS = "groupStrings";
+    private static final String GROUP_BYTES = "groupStrings";
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC 
+    private static ConsumerConnector stringsConsumerConn;
+    private static ConsumerConnector bytesConsumerConn;
+
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
         + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
         + "&requestRequiredAcks=-1")
-    private Endpoint to;
+    private Endpoint toStrings;
 
-    @Produce(uri = "direct:start")
-    private ProducerTemplate template;
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1")
+    private Endpoint toBytes;
 
-    private ConsumerConnector kafkaConsumer;
+    @Produce(uri = "direct:startStrings")
+    private ProducerTemplate stringsTemplate;
 
-    @Before
-    public void before() {
-        Properties props = new Properties();
-       
-        props.put("zookeeper.connect", "localhost:" + getZookeeperPort());
-        props.put("group.id", KafkaConstants.DEFAULT_GROUP);
-        props.put("zookeeper.session.timeout.ms", "6000");
-        props.put("zookeeper.connectiontimeout.ms", "12000");
-        props.put("zookeeper.sync.time.ms", "200");
-        props.put("auto.commit.interval.ms", "1000");
-        props.put("auto.offset.reset", "smallest");
+    @Produce(uri = "direct:startBytes")
+    private ProducerTemplate bytesTemplate;
+
+
+    @BeforeClass
+    public static void before() {
+        Properties stringsProps = new Properties();
        
-        kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+        stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort());
+        stringsProps.put("group.id", GROUP_STRINGS);
+        stringsProps.put("zookeeper.session.timeout.ms", "6000");
+        stringsProps.put("zookeeper.connectiontimeout.ms", "12000");
+        stringsProps.put("zookeeper.sync.time.ms", "200");
+        stringsProps.put("auto.commit.interval.ms", "1000");
+        stringsProps.put("auto.offset.reset", "smallest");
+        stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps));
+
+        Properties bytesProps = new Properties();
+        bytesProps.putAll(stringsProps);
+        bytesProps.put("group.id", GROUP_BYTES);
+        bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps));
     }
 
-    @After
-    public void after() {
-        kafkaConsumer.shutdown();
+    @AfterClass
+    public static void after() {
+        stringsConsumerConn.shutdown();
+        bytesConsumerConn.shutdown();
     }
 
     @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start").to(to);
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        return new RouteBuilder[] {
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:startStrings").to(toStrings);
+                }
+            },
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("direct:startBytes").to(toBytes);
+                }
             }
         };
     }
 
     @Test
-    public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException {
+    public void producedStringMessageIsReceivedByKafka() throws InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
+
+        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(TOPIC_STRINGS, 5);
+        topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5);
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap);
+
+        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+        boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+    }
+
+    @Test
+    public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
         int messageInTopic = 10;
         int messageInOtherTopic = 5;
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-        topicCountMap.put(TOPIC, 5);
-        topicCountMap.put(TOPIC_IN_HEADER, 5);
-        createKafkaMessageConsumer(messagesLatch, topicCountMap);
+        topicCountMap.put(TOPIC_BYTES, 5);
+        topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5);
+        createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap);
 
-        sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1");
-        sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER);
+        Map<String, Object> inTopicHeaders = new HashMap<String, Object>();
+        inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+        sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders);
+
+        Map<String, Object> otherTopicHeaders = new HashMap<String, Object>();
+        otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+        otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
+        sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders);
 
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
     }
 
-    private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
+    private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader,
+                                            CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap);
 
         ExecutorService executor = Executors.newFixedThreadPool(10);
-        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
-        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) {
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
     }
 
-    private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) {
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) {
         Map<String, Object> headerMap = new HashMap<String, Object>();
         for (int i = 0; i < headersWithValue.length; i = i + 2) {
             headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
         }
+        sendMessagesInRoute(messages, template, bodyOther, headerMap);
+    }
 
-        for (int k = 0; k < messageInOtherTopic; k++) {
+    private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, Map<String, Object> headerMap) {
+        for (int k = 0; k < messages; k++) {
             template.sendBodyAndHeaders(bodyOther, headerMap);
         }
     }