You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2015/09/03 00:32:36 UTC
[1/2] camel git commit: CAMEL-8923 Fixed the infinite loop by adding
bridgeEndpoint option to kafka endpoint.
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x bd29f1ccc -> 5ac926c3c
refs/heads/master dc9e8ebcd -> a9bad7b1a
CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint.
Cherry-picked from 19e70a6a36b75a1e2c3291d9d21df24e263023fa.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ac926c3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ac926c3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ac926c3
Branch: refs/heads/camel-2.15.x
Commit: 5ac926c3ca70dd35e8109405c51b69d87e12ef22
Parents: bd29f1c
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Jul 1 23:03:12 2015 +0800
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Sep 2 23:30:45 2015 +0100
----------------------------------------------------------------------
.../apache/camel/component/kafka/KafkaEndpoint.java | 8 ++++++++
.../apache/camel/component/kafka/KafkaProducer.java | 5 ++++-
.../camel/component/kafka/KafkaProducerTest.java | 13 +++++++++++++
3 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1652d78..9960a89 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -489,4 +489,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
public boolean isMultipleConsumersSupported() {
return true;
}
+
+ public boolean isBridgeEndpoint() {
+ return bridgeEndpoint;
+ }
+
+ public void setBridgeEndpoint(boolean bridgeEndpoint) {
+ this.bridgeEndpoint = bridgeEndpoint;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0fde1ae..06a0317 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -64,7 +64,10 @@ public class KafkaProducer<K, V> extends DefaultProducer {
@Override
@SuppressWarnings("unchecked")
public void process(Exchange exchange) throws CamelException {
- String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+ String topic = endpoint.getTopic();
+ if (!endpoint.isBridgeEndpoint()) {
+ topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
+ }
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5ac926c3/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d989c96..d2e868d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -128,6 +128,19 @@ public class KafkaProducerTest {
verifySendMessage("someKey", "someTopic", "someKey");
}
+
+ @Test
+ public void processSendMessageWithBridgeEndpoint() throws Exception {
+ endpoint.setTopic("someTopic");
+ endpoint.setBridgeEndpoint(true);
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+ in.setHeader(KafkaConstants.KEY, "someKey");
+
+ producer.process(exchange);
+
+ verifySendMessage("someKey", "someTopic", "someKey");
+ }
@SuppressWarnings({"unchecked", "rawtypes"})
protected void verifySendMessage(String partitionKey, String topic, String messageKey) {
[2/2] camel git commit: CAMEL-8923 Use local var instead of getter
again.
Posted by ra...@apache.org.
CAMEL-8923 Use local var instead of getter again.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a9bad7b1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a9bad7b1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a9bad7b1
Branch: refs/heads/master
Commit: a9bad7b1a06758f5dad72e436aed8dc6d21537e5
Parents: dc9e8eb
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Sep 2 23:32:13 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Sep 2 23:32:13 2015 +0100
----------------------------------------------------------------------
.../main/java/org/apache/camel/component/kafka/KafkaProducer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a9bad7b1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 3bc8e78..06a0317 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -66,7 +66,7 @@ public class KafkaProducer<K, V> extends DefaultProducer {
public void process(Exchange exchange) throws CamelException {
String topic = endpoint.getTopic();
if (!endpoint.isBridgeEndpoint()) {
- topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class);
+ topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
}
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);