You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/21 09:15:10 UTC
[2/3] camel git commit: added seek support for kafka client
added seek support for kafka client
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a1a250a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a1a250a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a1a250a
Branch: refs/heads/master
Commit: 0a1a250a2d4873c225bffd7b08541741a84713e1
Parents: d48cfee
Author: Sebastian Rühl <se...@senacor.com>
Authored: Wed Mar 16 14:51:56 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 21 09:09:36 2016 +0100
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 7 ++++-
.../camel/component/kafka/KafkaEndpoint.java | 13 +++++++++
.../component/kafka/KafkaConsumerFullTest.java | 30 +++++++++++++++++++-
3 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e65ed3b..8cf2ff9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -36,7 +36,7 @@ public class KafkaConsumer extends DefaultConsumer {
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private final Processor processor;
-
+
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -103,6 +103,11 @@ public class KafkaConsumer extends DefaultConsumer {
try {
LOG.debug("Subscribing {} to topic {}", threadId, topicName);
consumer.subscribe(Arrays.asList(topicName));
+
+ if (endpoint.isSeekToBeginning()) {
+ LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
+ consumer.seekToBeginning();
+ }
while (isRunAllowed() && !isSuspendingOrSuspended()) {
ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<Object, Object> record : records) {
http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5a56c39..3db41e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -40,6 +40,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
private KafkaConfiguration configuration = new KafkaConfiguration();
@UriParam
private boolean bridgeEndpoint;
+ @UriParam
+ private boolean seekToBeginning;
public KafkaEndpoint() {
}
@@ -673,4 +675,15 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
public void setBridgeEndpoint(boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
+
+ public boolean isSeekToBeginning() {
+ return seekToBeginning;
+ }
+
+ /**
+ * If the option is true, then KafkaConsumer will read from beginning on startup.
+ */
+ public void setSeekToBeginning(boolean seekToBeginning) {
+ this.seekToBeginning = seekToBeginning;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a1a250a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 4f8a0fd..f032ed0 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -70,7 +70,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
@Override
public void configure() throws Exception {
- from(from).to(to);
+ from(from).routeId("foo").to(to);
}
};
}
@@ -87,5 +87,33 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
to.assertIsSatisfied(3000);
}
+ @Test
+ public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg);
+ producer.send(data);
+ }
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+
+ //Restart endpoint,
+ context.stopRoute("foo");
+
+ KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
+ kafkaEndpoint.setSeekToBeginning(true);
+
+ context.startRoute("foo");
+
+ // As wee set seek to beginning we should re-consume all messages
+ to.assertIsSatisfied(3000);
+ }
+
}