You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:50:58 UTC
[07/50] [abbrv] camel git commit: CAMEL-9957: Fixed as it was not
correct.
CAMEL-9957: Fixed as it was not correct.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1428ccfc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1428ccfc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1428ccfc
Branch: refs/heads/kube-lb
Commit: 1428ccfce8d68511b8233834b9fa42b50fe9d146
Parents: abdea8e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 10:51:57 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaProducer.java | 27 ++++++++++++--------
.../component/kafka/KafkaProducerTest.java | 6 +----
2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0c4013f..6c432d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -73,6 +73,7 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
}
}
+ @SuppressWarnings("unchecked")
protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
String topic = endpoint.getTopic();
if (!endpoint.isBridgeEndpoint()) {
@@ -103,19 +104,25 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
@Override
@SuppressWarnings("unchecked")
- public void process(Exchange exchange) throws CamelException {
-
+ public void process(Exchange exchange) throws Exception {
ProducerRecord record = createRecorder(exchange);
// Just send out the record in the sync way
- try {
- kafkaProducer.send(record).get();
- } catch (Exception e) {
- throw new CamelException(e);
- }
+ kafkaProducer.send(record).get();
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ // force processing synchronously using different api
+ if (endpoint.isSynchronous()) {
+ try {
+ process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ callback.done(true);
+ return true;
+ }
+
try {
ProducerRecord record = createRecorder(exchange);
kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
@@ -129,10 +136,10 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
}
}
- class KafkaProducerCallBack implements Callback {
+ private final class KafkaProducerCallBack implements Callback {
- private Exchange exchange;
- private AsyncCallback callback;
+ private final Exchange exchange;
+ private final AsyncCallback callback;
KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
this.exchange = exchange;
http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 68798f3..40f2113 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -79,7 +79,7 @@ public class KafkaProducerTest {
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
}
- @Test(expected=CamelException.class)
+ @Test(expected = Exception.class)
@SuppressWarnings({"unchecked"})
public void processSendsMessageWithException() throws Exception {
endpoint.setTopic("sometopic");
@@ -90,7 +90,6 @@ public class KafkaProducerTest {
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
producer.process(exchange);
-
}
@Test
@@ -103,10 +102,8 @@ public class KafkaProducerTest {
producer.process(exchange, callback);
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
-
}
-
@Test
public void processAsyncSendsMessageWithException() throws Exception {
@@ -126,7 +123,6 @@ public class KafkaProducerTest {
Mockito.verify(callback).done(Matchers.eq(true));
}
-
@Test
public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
endpoint.setTopic(null);