You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:50:58 UTC

[07/50] [abbrv] camel git commit: CAMEL-9957: Fixed as it was not correct.

CAMEL-9957: Fixed as it was not correct.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1428ccfc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1428ccfc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1428ccfc

Branch: refs/heads/kube-lb
Commit: 1428ccfce8d68511b8233834b9fa42b50fe9d146
Parents: abdea8e
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 10:51:57 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 27 ++++++++++++--------
 .../component/kafka/KafkaProducerTest.java      |  6 +----
 2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0c4013f..6c432d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -73,6 +73,7 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         }
     }
 
+    @SuppressWarnings("unchecked")
     protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getTopic();
         if (!endpoint.isBridgeEndpoint()) {
@@ -103,19 +104,25 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void process(Exchange exchange) throws CamelException {
-
+    public void process(Exchange exchange) throws Exception {
         ProducerRecord record = createRecorder(exchange);
         // Just send out the record in the sync way
-        try {
-            kafkaProducer.send(record).get();
-        } catch (Exception e) {
-            throw new CamelException(e);
-        }
+        kafkaProducer.send(record).get();
     }
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        // force processing synchronously using different api
+        if (endpoint.isSynchronous()) {
+            try {
+                process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
+
         try {
             ProducerRecord record = createRecorder(exchange);
             kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
@@ -129,10 +136,10 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         }
     }
 
-    class KafkaProducerCallBack implements Callback {
+    private final class KafkaProducerCallBack implements Callback {
 
-        private Exchange exchange;
-        private AsyncCallback callback;
+        private final Exchange exchange;
+        private final AsyncCallback callback;
 
         KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/1428ccfc/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 68798f3..40f2113 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -79,7 +79,7 @@ public class KafkaProducerTest {
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
     }
 
-    @Test(expected=CamelException.class)
+    @Test(expected = Exception.class)
     @SuppressWarnings({"unchecked"})
     public void processSendsMessageWithException() throws Exception {
         endpoint.setTopic("sometopic");
@@ -90,7 +90,6 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
-
     }
 
     @Test
@@ -103,10 +102,8 @@ public class KafkaProducerTest {
         producer.process(exchange, callback);
 
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
-
     }
 
-
     @Test
     public void processAsyncSendsMessageWithException() throws Exception {
 
@@ -126,7 +123,6 @@ public class KafkaProducerTest {
         Mockito.verify(callback).done(Matchers.eq(true));
     }
 
-
     @Test
     public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception {
         endpoint.setTopic(null);