You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/02/27 15:07:00 UTC

camel git commit: CAMEL-10901 Implemented support for Kafka consumer to seek to end of topic on startup

Repository: camel
Updated Branches:
  refs/heads/master 6ed2c587e -> aaf83ceaf


CAMEL-10901 Implemented support for Kafka consumer to seek to end of topic on startup


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aaf83cea
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aaf83cea
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aaf83cea

Branch: refs/heads/master
Commit: aaf83ceaf1bdefb77f5cd509e60cce14ce7cacb1
Parents: 6ed2c58
Author: Sverker Abrahamsson <sv...@limetransit.com>
Authored: Mon Feb 27 01:08:14 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 27 16:04:31 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  2 +-
 .../component/kafka/KafkaConfiguration.java     | 18 ++++++-----
 .../camel/component/kafka/KafkaConsumer.java    | 17 +++++++---
 .../component/kafka/KafkaConsumerFullTest.java  | 33 +++++++++++++++++++-
 4 files changed, 56 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/aaf83cea/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 99d804b..b6c7887 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -80,7 +80,7 @@ The Kafka component supports 80 endpoint options which are listed below:
 | offsetRepository | consumer |  | String> | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit.
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
 | pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
-| seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer will read from beginning on startup.
+| seekTo | consumer |  | String | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.

http://git-wip-us.apache.org/repos/asf/camel/blob/aaf83cea/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 57eea8f..8807d29 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -106,8 +106,8 @@ public class KafkaConfiguration {
     //fetch.max.wait.ms
     @UriParam(label = "consumer", defaultValue = "500")
     private Integer fetchWaitMaxMs = 500;
-    @UriParam(label = "consumer")
-    private boolean seekToBeginning;
+    @UriParam(label = "consumer", enums = "beginning,end")
+    private String seekTo;
 
     //Consumer configuration properties
     @UriParam(label = "consumer")
@@ -1305,15 +1305,19 @@ public class KafkaConfiguration {
         this.valueDeserializer = valueDeserializer;
     }
 
-    public boolean isSeekToBeginning() {
-        return seekToBeginning;
+    public String getSeekTo() {
+        return seekTo;
     }
 
     /**
-     * If the option is true, then KafkaConsumer will read from beginning on startup.
+     * Set if KafkaConsumer will read from beginning or end on startup:
+     * beginning : read from beginning
+     * end : read from end
+     * 
+     * This is replacing the earlier property seekToBeginning
      */
-    public void setSeekToBeginning(boolean seekToBeginning) {
-        this.seekToBeginning = seekToBeginning;
+    public void setSeekTo(String seekTo) {
+        this.seekTo = seekTo;
     }
 
     public ExecutorService getWorkerPool() {

http://git-wip-us.apache.org/repos/asf/camel/blob/aaf83cea/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 0e38543..711da6b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -172,11 +172,18 @@ public class KafkaConsumer extends DefaultConsumer {
                             }
                         }
                     }
-                } else if (endpoint.getConfiguration().isSeekToBeginning()) {
-                    LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
-                    // This poll to ensures we have an assigned partition otherwise seek won't work
-                    consumer.poll(100);
-                    consumer.seekToBeginning(consumer.assignment());
+                } else if (endpoint.getConfiguration().getSeekTo() != null) {
+                    if (endpoint.getConfiguration().getSeekTo().equals("beginning")) {
+                        LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
+                        // This poll to ensures we have an assigned partition otherwise seek won't work
+                        consumer.poll(100);
+                        consumer.seekToBeginning(consumer.assignment());
+                    } else if (endpoint.getConfiguration().getSeekTo().equals("end")) {
+                        LOG.debug("{} is seeking to the end on topic {}", threadId, topicName);
+                        // This poll to ensures we have an assigned partition otherwise seek won't work
+                        consumer.poll(100);
+                        consumer.seekToEnd(consumer.assignment());
+                    }
                 }
                 while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);

http://git-wip-us.apache.org/repos/asf/camel/blob/aaf83cea/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 183d041..f90a9c6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -115,12 +115,43 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
         context.stopRoute("foo");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
-        kafkaEndpoint.getConfiguration().setSeekToBeginning(true);
+        kafkaEndpoint.getConfiguration().setSeekTo("beginning");
 
         context.startRoute("foo");
 
         // As wee set seek to beginning we should re-consume all messages
         to.assertIsSatisfied(3000);
     }
+
+    @Test
+    @Ignore("Currently there is a bug in kafka which leads to an uninterruptable thread so a resub take too long (works manually)")
+    public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
+            producer.send(data);
+        }
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        to.expectedMessageCount(0);
+
+        //Restart endpoint,
+        context.stopRoute("foo");
+
+        KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
+        kafkaEndpoint.getConfiguration().setSeekTo("end");
+
+        context.startRoute("foo");
+
+        // As wee set seek to end we should not re-consume any messages
+        synchronized (this) {
+            Thread.sleep(1000);
+        }
+        to.assertIsSatisfied(3000);
+    }
 }