You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2015/09/03 00:32:36 UTC

[1/2] camel git commit: CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint.

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x bd29f1ccc -> 5ac926c3c
  refs/heads/master dc9e8ebcd -> a9bad7b1a


CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint.

Cherry-picked from 19e70a6a36b75a1e2c3291d9d21df24e263023fa.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ac926c3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ac926c3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ac926c3

Branch: refs/heads/camel-2.15.x
Commit: 5ac926c3ca70dd35e8109405c51b69d87e12ef22
Parents: bd29f1c
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Jul 1 23:03:12 2015 +0800
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Sep 2 23:30:45 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaEndpoint.java    |  8 ++++++++
 .../apache/camel/component/kafka/KafkaProducer.java    |  5 ++++-
 .../camel/component/kafka/KafkaProducerTest.java       | 13 +++++++++++++
 3 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1652d78..9960a89 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -489,4 +489,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public boolean isMultipleConsumersSupported() {
         return true;
     }
+
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0fde1ae..06a0317 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -64,7 +64,10 @@ public class KafkaProducer<K, V> extends DefaultProducer {
     @Override
     @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws CamelException {
-        String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+        String topic = endpoint.getTopic();
+        if (!endpoint.isBridgeEndpoint()) {
+            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
+        }
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d989c96..d2e868d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -128,6 +128,19 @@ public class KafkaProducerTest {
 
         verifySendMessage("someKey", "someTopic", "someKey");
     }
+    
+    @Test
+    public void processSendMessageWithBridgeEndpoint() throws Exception {
+        endpoint.setTopic("someTopic");
+        endpoint.setBridgeEndpoint(true);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+        
+        producer.process(exchange);
+        
+        verifySendMessage("someKey", "someTopic", "someKey");
+    }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(String partitionKey, String topic, String messageKey) {


[2/2] camel git commit: CAMEL-8923 Use local var instead of getter again.

Posted by ra...@apache.org.
CAMEL-8923 Use local var instead of getter again.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a9bad7b1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a9bad7b1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a9bad7b1

Branch: refs/heads/master
Commit: a9bad7b1a06758f5dad72e436aed8dc6d21537e5
Parents: dc9e8eb
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Sep 2 23:32:13 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Sep 2 23:32:13 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaProducer.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a9bad7b1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 3bc8e78..06a0317 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -66,7 +66,7 @@ public class KafkaProducer<K, V> extends DefaultProducer {
     public void process(Exchange exchange) throws CamelException {
         String topic = endpoint.getTopic();
         if (!endpoint.isBridgeEndpoint()) {
-            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
         }
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);