You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/21 09:15:10 UTC

[2/3] camel git commit: added seek support for kafka client

added seek support for kafka client


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a1a250a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a1a250a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a1a250a

Branch: refs/heads/master
Commit: 0a1a250a2d4873c225bffd7b08541741a84713e1
Parents: d48cfee
Author: Sebastian Rühl <se...@senacor.com>
Authored: Wed Mar 16 14:51:56 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 21 09:09:36 2016 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    |  7 ++++-
 .../camel/component/kafka/KafkaEndpoint.java    | 13 +++++++++
 .../component/kafka/KafkaConsumerFullTest.java  | 30 +++++++++++++++++++-
 3 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e65ed3b..8cf2ff9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -36,7 +36,7 @@ public class KafkaConsumer extends DefaultConsumer {
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
-    
+
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -103,6 +103,11 @@ public class KafkaConsumer extends DefaultConsumer {
             try {
                 LOG.debug("Subscribing {} to topic {}", threadId, topicName);
                 consumer.subscribe(Arrays.asList(topicName));
+
+                if (endpoint.isSeekToBeginning()) {
+                    LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
+                    consumer.seekToBeginning();
+                }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
                     for (ConsumerRecord<Object, Object> record : records) {

http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5a56c39..3db41e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -40,6 +40,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @UriParam
     private boolean bridgeEndpoint;
+    @UriParam
+    private boolean seekToBeginning;
 
     public KafkaEndpoint() {
     }
@@ -673,4 +675,15 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setBridgeEndpoint(boolean bridgeEndpoint) {
         this.bridgeEndpoint = bridgeEndpoint;
     }
+
+    public boolean isSeekToBeginning() {
+        return seekToBeginning;
+    }
+
+    /**
+     * If the option is true, then KafkaConsumer will read from beginning on startup.
+     */
+    public void setSeekToBeginning(boolean seekToBeginning) {
+        this.seekToBeginning = seekToBeginning;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 4f8a0fd..f032ed0 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -70,7 +70,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
             @Override
             public void configure() throws Exception {
-                from(from).to(to);
+                from(from).routeId("foo").to(to);
             }
         };
     }
@@ -87,5 +87,33 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
         to.assertIsSatisfied(3000);
     }
 
+    @Test
+    public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
+            producer.send(data);
+        }
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+
+        //Restart endpoint,
+        context.stopRoute("foo");
+
+        KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
+        kafkaEndpoint.setSeekToBeginning(true);
+
+        context.startRoute("foo");
+
+        // As wee set seek to beginning we should re-consume all messages
+        to.assertIsSatisfied(3000);
+    }
+
 }