You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2016/05/11 03:55:37 UTC
camel git commit: CAMEL-9957 camel-kafka producer sends the message
in an async way
Repository: camel
Updated Branches:
refs/heads/master 2657ce788 -> 123f08fa4
CAMEL-9957 camel-kafka producer sends the message in an async way
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/123f08fa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/123f08fa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/123f08fa
Branch: refs/heads/master
Commit: 123f08fa4c7161167e986fed33f318e7b5ad460a
Parents: 2657ce7
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed May 11 11:55:13 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed May 11 11:55:13 2016 +0800
----------------------------------------------------------------------
.../camel/component/kafka/KafkaProducer.java | 54 +++++++++++++++++---
.../component/kafka/KafkaProducerTest.java | 52 +++++++++++++++++++
2 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/123f08fa/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4f6468b..0c4013f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,14 +18,18 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelException;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
-public class KafkaProducer extends DefaultProducer {
+public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
@@ -69,9 +73,7 @@ public class KafkaProducer extends DefaultProducer {
}
}
- @Override
- @SuppressWarnings("unchecked")
- public void process(Exchange exchange) throws CamelException {
+ protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
String topic = endpoint.getTopic();
if (!endpoint.isBridgeEndpoint()) {
topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
@@ -96,9 +98,15 @@ public class KafkaProducer extends DefaultProducer {
log.warn("No message key or partition key set");
record = new ProducerRecord(topic, msg);
}
+ return record;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(Exchange exchange) throws CamelException {
- // TODO: add support for async callback
- // requires a thread pool for processing outgoing routing
+ ProducerRecord record = createRecorder(exchange);
+ // Just send out the record in the sync way
try {
kafkaProducer.send(record).get();
} catch (Exception e) {
@@ -106,4 +114,38 @@ public class KafkaProducer extends DefaultProducer {
}
}
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ ProducerRecord record = createRecorder(exchange);
+ kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
+ // Finishing the processing in an async way
+ return false;
+ } catch (Exception ex) {
+ // Just set the exception back to the client
+ exchange.setException(ex);
+ callback.done(true);
+ return true;
+ }
+ }
+
+ class KafkaProducerCallBack implements Callback {
+
+ private Exchange exchange;
+ private AsyncCallback callback;
+
+ KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ if (e != null) {
+ // Just set the exception back
+ exchange.setException(e);
+ }
+ callback.done(false);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/123f08fa/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 8e94320..62ab51f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -19,13 +19,16 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
import java.util.concurrent.Future;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.ApiException;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
@@ -40,6 +43,7 @@ public class KafkaProducerTest {
private Exchange exchange = Mockito.mock(Exchange.class);
private Message in = new DefaultMessage();
+ private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
@SuppressWarnings({"unchecked"})
public KafkaProducerTest() throws Exception {
@@ -75,6 +79,54 @@ public class KafkaProducerTest {
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
}
+ @Test(expected=CamelException.class)
+ @SuppressWarnings({"unchecked"})
+ public void processSendsMessageWithException() throws Exception {
+ endpoint.setTopic("sometopic");
+ // setup the exception here
+ org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+ Mockito.when(kp.send(Mockito.any())).thenThrow(new ApiException());
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+ producer.process(exchange);
+
+ }
+
+ @Test
+ public void processAsyncSendsMessage() throws Exception {
+ endpoint.setTopic("sometopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+ producer.process(exchange, callback);
+
+ Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+
+ }
+
+
+ @Test
+ public void processAsyncSendsMessageWithException() throws Exception {
+
+ endpoint.setTopic("sometopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+
+ // setup the exception here
+ org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+ Mockito.when(kp.send(Mockito.any(), Mockito.any())).thenThrow(new ApiException());
+
+ in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+ producer.process(exchange, callback);
+
+ Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+ Mockito.verify(exchange).setException(Matchers.isA(ApiException.class));
+ Mockito.verify(callback).done(Matchers.eq(true));
+ }
+
+
@Test
public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
endpoint.setTopic(null);