You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/27 08:02:40 UTC
[3/3] git commit: #121: KafkaProducer: lookup the topic in the
message header. Thanks to Fabien Chaillou for the patch. Fixed CS.
#121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02f2945c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02f2945c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02f2945c
Branch: refs/heads/camel-2.13.x
Commit: 02f2945cd0d290a9e9427ee7cb8d9c2be560b0f2
Parents: 6af7f21
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 27 08:04:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 27 08:05:49 2014 +0100
----------------------------------------------------------------------
.../camel/component/kafka/KafkaProducer.java | 8 ++-
.../camel/component/kafka/KafkaConsumerIT.java | 2 +-
.../camel/component/kafka/KafkaProducerIT.java | 75 +++++++++++++-------
.../component/kafka/KafkaProducerTest.java | 40 +++++++++++
4 files changed, 98 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 66440f3..6c2d167 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -69,9 +69,15 @@ public class KafkaProducer extends DefaultProducer {
if (partitionKey == null) {
throw new CamelExchangeException("No partition key set", exchange);
}
+
+ String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+ if (topic == null) {
+ throw new CamelExchangeException("No topic key set", exchange);
+ }
+
String msg = exchange.getIn().getBody(String.class);
- KeyedMessage<String, String> data = new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg);
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index a8ca6c3..5a4baf7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,7 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
@Test
public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
to.expectedMessageCount(5);
- to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" );
+ to.expectedBodiesReceived("message-0", "message-1", "message-2", "message-3", "message-4");
for (int k = 0; k < 5; k++) {
String msg = "message-" + k;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 5805666..85fa272 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -17,13 +17,14 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
@@ -47,6 +48,7 @@ import org.junit.Test;
public class KafkaProducerIT extends CamelTestSupport {
public static final String TOPIC = "test";
+ public static final String TOPIC_IN_HEADER = "testHeader";
@EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
private Endpoint to;
@@ -86,40 +88,63 @@ public class KafkaProducerIT extends CamelTestSupport {
@Test
public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
- final List<String> messages = new ArrayList<String>();
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, 5);
+ topicCountMap.put(TOPIC_IN_HEADER, 5);
+ createKafkaMessageConsumer(messagesLatch, topicCountMap);
+
+ sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+ sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER);
+
+ boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics", allMessagesReceived);
+ }
+
+ private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
-
- ExecutorService executor = Executors.newFixedThreadPool(5);
- for (final KafkaStream stream : streams) {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- String msg = new String(it.next().message());
- messages.add(msg);
- }
- }
- });
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+ executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+ }
+ for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+ executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+ }
+ }
+
+ private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) {
+ Map<String, Object> headerMap = new HashMap<String, Object>();
+ for (int i = 0; i < headersWithValue.length; i = i + 2) {
+ headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
}
- for (int k = 0; k < 10; k++) {
- template.sendBodyAndHeader("IT test message", KafkaConstants.PARTITION_KEY, "1");
+ for (int k = 0; k < messageInOtherTopic; k++) {
+ template.sendBodyAndHeaders(bodyOther, headerMap);
}
+ }
- for (int k = 0; k < 20; k++) {
- if (messages.size() == 10) {
- return;
- }
- Thread.sleep(200);
+ private static class KakfaTopicConsumer implements Runnable {
+ private final KafkaStream stream;
+ private final CountDownLatch latch;
+
+ public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+ this.stream = stream;
+ this.latch = latch;
}
- fail();
+ @Override
+ public void run() {
+ ConsumerIterator<byte[], byte[]> it = stream.iterator();
+ while (it.hasNext()) {
+ String msg = new String(it.next().message());
+ latch.countDown();
+ }
+ }
}
}
-
http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index ccaaab5..acdfc60 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -70,6 +70,46 @@ public class KafkaProducerTest {
Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
}
+ @Test
+ public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
+
+ endpoint.setTopic(null);
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+ in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+ producer.process(exchange);
+
+ ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+ Mockito.verify(producer.producer).send(captor.capture());
+ assertEquals("4", captor.getValue().key());
+ assertEquals("anotherTopic", captor.getValue().topic());
+ }
+
+ @Test
+ public void processSendsMesssageWithTopicHeaderAndEndPoint() throws Exception {
+
+ endpoint.setTopic("sometopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+ in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+
+ producer.process(exchange);
+
+ ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
+ Mockito.verify(producer.producer).send(captor.capture());
+ assertEquals("4", captor.getValue().key());
+ assertEquals("anotherTopic", captor.getValue().topic());
+ }
+
+ @Test(expected = CamelException.class)
+ public void processRequiresTopicInEndpointOrInHeader() throws Exception {
+ endpoint.setTopic(null);
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+ producer.process(exchange);
+ }
+
@Test(expected = CamelException.class)
public void processRequiresPartitionHeader() throws Exception {
endpoint.setTopic("sometopic");