You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/27 08:02:40 UTC

[3/3] git commit: #121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.

#121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02f2945c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02f2945c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02f2945c

Branch: refs/heads/camel-2.13.x
Commit: 02f2945cd0d290a9e9427ee7cb8d9c2be560b0f2
Parents: 6af7f21
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 08:04:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 08:05:49 2014 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    |  8 ++-
 .../camel/component/kafka/KafkaConsumerIT.java  |  2 +-
 .../camel/component/kafka/KafkaProducerIT.java  | 75 +++++++++++++-------
 .../component/kafka/KafkaProducerTest.java      | 40 +++++++++++
 4 files changed, 98 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 66440f3..6c2d167 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -69,9 +69,15 @@ public class KafkaProducer extends DefaultProducer {
         if (partitionKey == null) {
             throw new CamelExchangeException("No partition key set", exchange);
         }
+
+        String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+        if (topic == null) {
+            throw new CamelExchangeException("No topic key set", exchange);
+        }
+
         String msg = exchange.getIn().getBody(String.class);
 
-        KeyedMessage<String, String> data = new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg);
+        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
         producer.send(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index a8ca6c3..5a4baf7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,7 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
     @Test
     public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
-        to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" );
+        to.expectedBodiesReceived("message-0", "message-1", "message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);

http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 5805666..85fa272 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -17,13 +17,14 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -47,6 +48,7 @@ import org.junit.Test;
 public class KafkaProducerIT extends CamelTestSupport {
 
     public static final String TOPIC = "test";
+    public static final String TOPIC_IN_HEADER = "testHeader";
 
     @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
     private Endpoint to;
@@ -86,40 +88,63 @@ public class KafkaProducerIT extends CamelTestSupport {
 
     @Test
     public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
 
-        final List<String> messages = new ArrayList<String>();
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
 
         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
         topicCountMap.put(TOPIC, 5);
+        topicCountMap.put(TOPIC_IN_HEADER, 5);
+        createKafkaMessageConsumer(messagesLatch, topicCountMap);
+
+        sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER);
+
+        boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics", allMessagesReceived);
+    }
+
+    private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
-
-        ExecutorService executor = Executors.newFixedThreadPool(5);
-        for (final KafkaStream stream : streams) {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
-                    while (it.hasNext()) {
-                        String msg = new String(it.next().message());
-                        messages.add(msg);
-                    }
-                }
-            });
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        }
+        for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        }
+    }
+
+    private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) {
+        Map<String, Object> headerMap = new HashMap<String, Object>();
+        for (int i = 0; i < headersWithValue.length; i = i + 2) {
+            headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
         }
 
-        for (int k = 0; k < 10; k++) {
-            template.sendBodyAndHeader("IT test message", KafkaConstants.PARTITION_KEY, "1");
+        for (int k = 0; k < messageInOtherTopic; k++) {
+            template.sendBodyAndHeaders(bodyOther, headerMap);
         }
+    }
 
-        for (int k = 0; k < 20; k++) {
-            if (messages.size() == 10) {
-                return;
-            }
-            Thread.sleep(200);
+    private static class KakfaTopicConsumer implements Runnable {
+        private final KafkaStream stream;
+        private final CountDownLatch latch;
+
+        public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+            this.stream = stream;
+            this.latch = latch;
         }
 
-        fail();
+        @Override
+        public void run() {
+            ConsumerIterator<byte[], byte[]> it = stream.iterator();
+            while (it.hasNext()) {
+                String msg = new String(it.next().message());
+                latch.countDown();
+            }
+        }
     }
 }
-

http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index ccaaab5..acdfc60 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -70,6 +70,46 @@ public class KafkaProducerTest {
         Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
     }
 
+    @Test
+    public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
+
+        endpoint.setTopic(null);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+        producer.process(exchange);
+
+        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+        Mockito.verify(producer.producer).send(captor.capture());
+        assertEquals("4", captor.getValue().key());
+        assertEquals("anotherTopic", captor.getValue().topic());
+    }
+
+    @Test
+    public void processSendsMesssageWithTopicHeaderAndEndPoint() throws Exception {
+
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+        producer.process(exchange);
+
+        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+        Mockito.verify(producer.producer).send(captor.capture());
+        assertEquals("4", captor.getValue().key());
+        assertEquals("anotherTopic", captor.getValue().topic());
+    }
+
+    @Test(expected = CamelException.class)
+    public void processRequiresTopicInEndpointOrInHeader() throws Exception {
+        endpoint.setTopic(null);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        producer.process(exchange);
+    }
+
     @Test(expected = CamelException.class)
     public void processRequiresPartitionHeader() throws Exception {
         endpoint.setTopic("sometopic");