You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/27 08:01:58 UTC

[2/8] git commit: Fix the KafkaConsumer to put the message in the body Right now, the consumer would create an exchange for each received message. However, it didn't filled the exchange body with the received message content.

Fix the KafkaConsumer to put the message in the body
Right now, the consumer would create an exchange for each received message.
However, it didn't filled the exchange body with the received message content.

Right now, it is set as an array of bytes but in the future we could use the Consumer decoder class to convert the content in the right type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a2fd504a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a2fd504a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a2fd504a

Branch: refs/heads/master
Commit: a2fd504ac48a9f4759f59b7d2f028df1dc7087c4
Parents: 06ffb5d
Author: Fabien Chaillou <fa...@gmail.com>
Authored: Wed Mar 26 15:14:10 2014 -0400
Committer: Fabien Chaillou <fa...@gmail.com>
Committed: Wed Mar 26 15:14:10 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b700850..f88e3d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -111,6 +111,7 @@ public class KafkaEndpoint extends DefaultEndpoint {
         message.setHeader(KafkaConstants.PARTITION, mm.partition());
         message.setHeader(KafkaConstants.TOPIC, mm.topic());
         message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setBody(mm.message());
         exchange.setIn(message);
 
         return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index cb9be59..a8ca6c3 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,6 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
     @Test
     public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
+        to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" );
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);