You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 09:24:38 UTC

[2/2] camel git commit: CAMEL-9790: When sending to kafka then Camel should catch exceptions so Camel error handler can react.

CAMEL-9790: When sending to kafka then Camel should catch exceptions so Camel error handler can react.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77428ae1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77428ae1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77428ae1

Branch: refs/heads/camel-2.17.x
Commit: 77428ae1ca262c7ea895ead50cd3e931d2c66286
Parents: e4dbb8e
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 09:23:29 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 09:24:24 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/kafka/KafkaProducer.java |  9 +++++++--
 .../apache/camel/component/kafka/KafkaProducerTest.java | 12 +++++++++++-
 2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6f9ea79..4f6468b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -97,8 +97,13 @@ public class KafkaProducer extends DefaultProducer {
             record = new ProducerRecord(topic, msg);
         }
 
-        // TODO: add support for async callback in the send
-        kafkaProducer.send(record);
+        // TODO: add support for async callback
+        // requires a thread pool for processing outgoing routing
+        try {
+            kafkaProducer.send(record).get();
+        } catch (Exception e) {
+            throw new CamelException(e);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 98f6421..8e94320 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
@@ -24,6 +25,7 @@ import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
@@ -45,7 +47,15 @@ public class KafkaProducerTest {
                 "kafka:broker1:1234,broker2:4567?topic=sometopic", null);
         endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
-        producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
+
+
+        RecordMetadata rm = new RecordMetadata(null, 1, 1);
+        Future future = Mockito.mock(Future.class);
+        Mockito.when(future.get()).thenReturn(rm);
+        org.apache.kafka.clients.producer.KafkaProducer kp = Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class);
+        Mockito.when(kp.send(Mockito.any())).thenReturn(future);
+
+        producer.setKafkaProducer(kp);
     }
 
     @Test