You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/08/07 13:53:27 UTC

[2/2] camel git commit: Kafka offset of the message included as Camel Exchange header.

Kafka offset of the message included as Camel Exchange header.

This might be helpful also for logging purposes.
In the test I've reused an offset value which was already present but
not checked.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c81a0516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c81a0516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c81a0516

Branch: refs/heads/master
Commit: c81a051616ecf97139c2ec3a8b972320b570167b
Parents: 3fc9de7
Author: tarilabs <ma...@gmail.com>
Authored: Fri Aug 7 13:43:07 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 7 14:00:27 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaConstants.java  | 1 +
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../java/org/apache/camel/component/kafka/KafkaEndpointTest.java    | 1 +
 3 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 6c31b65..3397060 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -27,6 +27,7 @@ public final class KafkaConstants {
     public static final String PARTITION = "kafka.EXCHANGE_NAME";
     public static final String KEY = "kafka.CONTENT_TYPE";
     public static final String TOPIC = "kafka.TOPIC";
+    public static final String OFFSET = "kafka.OFFSET";
 
     public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
     public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";

http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 78863f8..165c984 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -113,6 +113,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         Message message = new DefaultMessage();
         message.setHeader(KafkaConstants.PARTITION, mm.partition());
         message.setHeader(KafkaConstants.TOPIC, mm.topic());
+        message.setHeader(KafkaConstants.OFFSET, mm.offset());
         if (mm.key() != null) {
             message.setHeader(KafkaConstants.KEY, new String(mm.key()));
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index ed4a6d1..be16766 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -44,6 +44,7 @@ public class KafkaEndpointTest {
         assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY));
         assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC));
         assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));
+        assertEquals(56L, exchange.getIn().getHeader(KafkaConstants.OFFSET));
     }
 
     @Test