You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 09:24:37 UTC
[1/2] camel git commit: CAMEL-9790: When sending to kafka then Camel
should catch exceptions so Camel error handler can react.
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x e4dbb8e37 -> 77428ae1c
refs/heads/master 1413c1f47 -> 32417731b
CAMEL-9790: When sending to kafka then Camel should catch exceptions so Camel error handler can react.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/32417731
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/32417731
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/32417731
Branch: refs/heads/master
Commit: 32417731bced91047f39f9eb17cc8d3d6e7f58cb
Parents: 1413c1f
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 09:23:29 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 09:23:29 2016 +0200
----------------------------------------------------------------------
.../org/apache/camel/component/kafka/KafkaProducer.java | 9 +++++++--
.../apache/camel/component/kafka/KafkaProducerTest.java | 12 +++++++++++-
2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/32417731/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6f9ea79..4f6468b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -97,8 +97,13 @@ public class KafkaProducer extends DefaultProducer {
record = new ProducerRecord(topic, msg);
}
- // TODO: add support for async callback in the send
- kafkaProducer.send(record);
+ // TODO: add support for async callback
+ // requires a thread pool for processing outgoing routing
+ try {
+ kafkaProducer.send(record).get();
+ } catch (Exception e) {
+ throw new CamelException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/32417731/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 98f6421..8e94320 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
@@ -24,6 +25,7 @@ import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
@@ -45,7 +47,15 @@ public class KafkaProducerTest {
"kafka:broker1:1234,broker2:4567?topic=sometopic", null);
endpoint.setBrokers("broker1:1234,broker2:4567");
producer = new KafkaProducer(endpoint);
- producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
+
+
+ RecordMetadata rm = new RecordMetadata(null, 1, 1);
+ Future future = Mockito.mock(Future.class);
+ Mockito.when(future.get()).thenReturn(rm);
+ org.apache.kafka.clients.producer.KafkaProducer kp = Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class);
+ Mockito.when(kp.send(Mockito.any())).thenReturn(future);
+
+ producer.setKafkaProducer(kp);
}
@Test
[2/2] camel git commit: CAMEL-9790: When sending to kafka then Camel
should catch exceptions so Camel error handler can react.
Posted by da...@apache.org.
CAMEL-9790: When sending to kafka then Camel should catch exceptions so Camel error handler can react.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77428ae1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77428ae1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77428ae1
Branch: refs/heads/camel-2.17.x
Commit: 77428ae1ca262c7ea895ead50cd3e931d2c66286
Parents: e4dbb8e
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 09:23:29 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 09:24:24 2016 +0200
----------------------------------------------------------------------
.../org/apache/camel/component/kafka/KafkaProducer.java | 9 +++++++--
.../apache/camel/component/kafka/KafkaProducerTest.java | 12 +++++++++++-
2 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6f9ea79..4f6468b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -97,8 +97,13 @@ public class KafkaProducer extends DefaultProducer {
record = new ProducerRecord(topic, msg);
}
- // TODO: add support for async callback in the send
- kafkaProducer.send(record);
+ // TODO: add support for async callback
+ // requires a thread pool for processing outgoing routing
+ try {
+ kafkaProducer.send(record).get();
+ } catch (Exception e) {
+ throw new CamelException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/77428ae1/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 98f6421..8e94320 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
@@ -24,6 +25,7 @@ import org.apache.camel.Message;
import org.apache.camel.impl.DefaultMessage;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
@@ -45,7 +47,15 @@ public class KafkaProducerTest {
"kafka:broker1:1234,broker2:4567?topic=sometopic", null);
endpoint.setBrokers("broker1:1234,broker2:4567");
producer = new KafkaProducer(endpoint);
- producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
+
+
+ RecordMetadata rm = new RecordMetadata(null, 1, 1);
+ Future future = Mockito.mock(Future.class);
+ Mockito.when(future.get()).thenReturn(rm);
+ org.apache.kafka.clients.producer.KafkaProducer kp = Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class);
+ Mockito.when(kp.send(Mockito.any())).thenReturn(future);
+
+ producer.setKafkaProducer(kp);
}
@Test