You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2016/05/11 03:55:37 UTC

camel git commit: CAMEL-9957 camel-kafka producer sends the message in an async way

Repository: camel
Updated Branches:
  refs/heads/master 2657ce788 -> 123f08fa4


CAMEL-9957 camel-kafka producer sends the message in an async way


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/123f08fa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/123f08fa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/123f08fa

Branch: refs/heads/master
Commit: 123f08fa4c7161167e986fed33f318e7b5ad460a
Parents: 2657ce7
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed May 11 11:55:13 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed May 11 11:55:13 2016 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 54 +++++++++++++++++---
 .../component/kafka/KafkaProducerTest.java      | 52 +++++++++++++++++++
 2 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/123f08fa/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4f6468b..0c4013f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,14 +18,18 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
-public class KafkaProducer extends DefaultProducer {
+public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
 
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
@@ -69,9 +73,7 @@ public class KafkaProducer extends DefaultProducer {
         }
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public void process(Exchange exchange) throws CamelException {
+    protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getTopic();
         if (!endpoint.isBridgeEndpoint()) {
             topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
@@ -96,9 +98,15 @@ public class KafkaProducer extends DefaultProducer {
             log.warn("No message key or partition key set");
             record = new ProducerRecord(topic, msg);
         }
+        return record;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void process(Exchange exchange) throws CamelException {
 
-        // TODO: add support for async callback
-        // requires a thread pool for processing outgoing routing
+        ProducerRecord record = createRecorder(exchange);
+        // Just send out the record in the sync way
         try {
             kafkaProducer.send(record).get();
         } catch (Exception e) {
@@ -106,4 +114,38 @@ public class KafkaProducer extends DefaultProducer {
         }
     }
 
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            ProducerRecord record = createRecorder(exchange);
+            kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
+            // Finishing the processing in an async way
+            return false;
+        } catch (Exception ex) {
+            // Just set the exception back to the client
+            exchange.setException(ex);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    class KafkaProducerCallBack implements Callback {
+
+        private Exchange exchange;
+        private AsyncCallback callback;
+
+        KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+            if (e != null) {
+                // Just set the exception back
+                exchange.setException(e);
+            }
+            callback.done(false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/123f08fa/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 8e94320..62ab51f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -19,13 +19,16 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.ApiException;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
@@ -40,6 +43,7 @@ public class KafkaProducerTest {
 
     private Exchange exchange = Mockito.mock(Exchange.class);
     private Message in = new DefaultMessage();
+    private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
@@ -75,6 +79,54 @@ public class KafkaProducerTest {
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
     }
 
+    @Test(expected=CamelException.class)
+    @SuppressWarnings({"unchecked"})
+    public void processSendsMessageWithException() throws Exception {
+        endpoint.setTopic("sometopic");
+        // setup the exception here
+        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        Mockito.when(kp.send(Mockito.any())).thenThrow(new ApiException());
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+        producer.process(exchange);
+
+    }
+
+    @Test
+    public void processAsyncSendsMessage() throws Exception {
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+        producer.process(exchange, callback);
+
+        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+
+    }
+
+
+    @Test
+    public void processAsyncSendsMessageWithException() throws Exception {
+
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
+        // setup the exception here
+        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        Mockito.when(kp.send(Mockito.any(), Mockito.any())).thenThrow(new ApiException());
+
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+        producer.process(exchange, callback);
+
+        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+        Mockito.verify(exchange).setException(Matchers.isA(ApiException.class));
+        Mockito.verify(callback).done(Matchers.eq(true));
+    }
+
+
     @Test
     public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
         endpoint.setTopic(null);