You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/16 17:45:51 UTC
camel git commit: CAMEL-8790: Kafka producer hard coded to use
Strings. Thanks to Mark Mindenhall for the patch.
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 9e2cb5fd6 -> 6cb8770ee
CAMEL-8790: Kafka producer hard coded to use Strings. Thanks to Mark Mindenhall for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cb8770e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cb8770e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cb8770e
Branch: refs/heads/camel-2.15.x
Commit: 6cb8770ee1d330ca158e4d5b9bab6f231403e120
Parents: 9e2cb5f
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jul 16 17:50:21 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jul 16 17:52:29 2015 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConstants.java | 3 +
.../camel/component/kafka/KafkaEndpoint.java | 16 ++-
.../camel/component/kafka/KafkaProducer.java | 26 ++--
.../component/kafka/KafkaProducerFullTest.java | 135 +++++++++++++------
4 files changed, 129 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 18f1da0..6c31b65 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -28,6 +28,9 @@ public final class KafkaConstants {
public static final String KEY = "kafka.CONTENT_TYPE";
public static final String TOPIC = "kafka.TOPIC";
+ public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
+ public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+
private KafkaConstants() {
// Utility class
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1ce325b..8dde7c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -76,7 +76,18 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
@Override
public Producer createProducer() throws Exception {
- return new KafkaProducer(this);
+ String msgClassName = getConfiguration().getSerializerClass();
+ String keyClassName = getConfiguration().getKeySerializerClass();
+ if (msgClassName == null) {
+ msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER;
+ }
+ if (keyClassName == null) {
+ keyClassName = msgClassName;
+ }
+
+ Class k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName);
+ Class v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName);
+ return createProducer(k, v, this);
}
@Override
@@ -103,6 +114,9 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
return exchange;
}
+ protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) {
+ return new KafkaProducer<K, V>(endpoint);
+ }
// Delegated properties from the configuration
//-------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index b832740..0fde1ae 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -29,9 +29,9 @@ import org.apache.camel.impl.DefaultProducer;
/**
*
*/
-public class KafkaProducer extends DefaultProducer {
+public class KafkaProducer<K, V> extends DefaultProducer {
- protected Producer<String, String> producer;
+ protected Producer<K, V> producer;
private final KafkaEndpoint endpoint;
public KafkaProducer(KafkaEndpoint endpoint) {
@@ -58,30 +58,34 @@ public class KafkaProducer extends DefaultProducer {
protected void doStart() throws Exception {
Properties props = getProps();
ProducerConfig config = new ProducerConfig(props);
- producer = new Producer<String, String>(config);
+ producer = new Producer<K, V>(config);
}
@Override
+ @SuppressWarnings("unchecked")
public void process(Exchange exchange) throws CamelException {
String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);
}
- String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class);
+ K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
boolean hasPartitionKey = partitionKey != null;
- String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class);
+
+ K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
boolean hasMessageKey = messageKey != null;
- String msg = exchange.getIn().getBody(String.class);
- KeyedMessage<String, String> data;
+
+ V msg = (V) exchange.getIn().getBody();
+ KeyedMessage<K, V> data;
+
if (hasPartitionKey && hasMessageKey) {
- data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg);
+ data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
} else if (hasPartitionKey) {
- data = new KeyedMessage<String, String>(topic, partitionKey, msg);
+ data = new KeyedMessage<K, V>(topic, partitionKey, msg);
} else if (hasMessageKey) {
- data = new KeyedMessage<String, String>(topic, messageKey, msg);
+ data = new KeyedMessage<K, V>(topic, messageKey, msg);
} else {
log.warn("No message key or partition key set");
- data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg);
+ data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg);
}
producer.send(data);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6cb8770e/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 56e0eb2..d76a059 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -36,97 +36,154 @@ import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
- public static final String TOPIC = "test";
- public static final String TOPIC_IN_HEADER = "testHeader";
+ private static final String TOPIC_STRINGS = "test";
+ private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
+ private static final String TOPIC_BYTES = "testBytes";
+ private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
+ private static final String GROUP_STRINGS = "groupStrings";
+ private static final String GROUP_BYTES = "groupStrings";
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
- @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+ private static ConsumerConnector stringsConsumerConn;
+ private static ConsumerConnector bytesConsumerConn;
+
+ @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
+ "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
+ "&requestRequiredAcks=-1")
- private Endpoint to;
+ private Endpoint toStrings;
- @Produce(uri = "direct:start")
- private ProducerTemplate template;
+ @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1")
+ private Endpoint toBytes;
- private ConsumerConnector kafkaConsumer;
+ @Produce(uri = "direct:startStrings")
+ private ProducerTemplate stringsTemplate;
- @Before
- public void before() {
- Properties props = new Properties();
-
- props.put("zookeeper.connect", "localhost:" + getZookeeperPort());
- props.put("group.id", KafkaConstants.DEFAULT_GROUP);
- props.put("zookeeper.session.timeout.ms", "6000");
- props.put("zookeeper.connectiontimeout.ms", "12000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "smallest");
+ @Produce(uri = "direct:startBytes")
+ private ProducerTemplate bytesTemplate;
+
+
+ @BeforeClass
+ public static void before() {
+ Properties stringsProps = new Properties();
- kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+ stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort());
+ stringsProps.put("group.id", GROUP_STRINGS);
+ stringsProps.put("zookeeper.session.timeout.ms", "6000");
+ stringsProps.put("zookeeper.connectiontimeout.ms", "12000");
+ stringsProps.put("zookeeper.sync.time.ms", "200");
+ stringsProps.put("auto.commit.interval.ms", "1000");
+ stringsProps.put("auto.offset.reset", "smallest");
+ stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps));
+
+ Properties bytesProps = new Properties();
+ bytesProps.putAll(stringsProps);
+ bytesProps.put("group.id", GROUP_BYTES);
+ bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps));
}
- @After
- public void after() {
- kafkaConsumer.shutdown();
+ @AfterClass
+ public static void after() {
+ stringsConsumerConn.shutdown();
+ bytesConsumerConn.shutdown();
}
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:start").to(to);
+ protected RouteBuilder[] createRouteBuilders() throws Exception {
+ return new RouteBuilder[] {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:startStrings").to(toStrings);
+ }
+ },
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:startBytes").to(toBytes);
+ }
}
};
}
@Test
- public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException {
+ public void producedStringMessageIsReceivedByKafka() throws InterruptedException, IOException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
+
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(TOPIC_STRINGS, 5);
+ topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5);
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap);
+
+ sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1");
+ sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+ boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+ }
+
+ @Test
+ public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
int messageInOtherTopic = 5;
CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(TOPIC, 5);
- topicCountMap.put(TOPIC_IN_HEADER, 5);
- createKafkaMessageConsumer(messagesLatch, topicCountMap);
+ topicCountMap.put(TOPIC_BYTES, 5);
+ topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5);
+ createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap);
- sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1");
- sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER);
+ Map<String, Object> inTopicHeaders = new HashMap<String, Object>();
+ inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+ sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders);
+
+ Map<String, Object> otherTopicHeaders = new HashMap<String, Object>();
+ otherTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
+ otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
+ sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders);
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
}
- private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
+ private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader,
+ CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) {
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap);
ExecutorService executor = Executors.newFixedThreadPool(10);
- for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) {
+ for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
}
- for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER)) {
+ for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) {
executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
}
}
- private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) {
+ private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) {
Map<String, Object> headerMap = new HashMap<String, Object>();
for (int i = 0; i < headersWithValue.length; i = i + 2) {
headerMap.put(headersWithValue[i], headersWithValue[i + 1]);
}
+ sendMessagesInRoute(messages, template, bodyOther, headerMap);
+ }
- for (int k = 0; k < messageInOtherTopic; k++) {
+ private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, Map<String, Object> headerMap) {
+ for (int k = 0; k < messages; k++) {
template.sendBodyAndHeaders(bodyOther, headerMap);
}
}