You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/08/16 08:16:35 UTC
[camel] branch master updated: CAMEL-12732: camel-kafka consumer
should automatic start/stop offset repo if needed.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 67dbf1f CAMEL-12732: camel-kafka consumer should automatic start/stop offset repo if needed.
67dbf1f is described below
commit 67dbf1f35a40b4202b005099f2974644046a78ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Aug 16 09:36:24 2018 +0200
CAMEL-12732: camel-kafka consumer should automatic start/stop offset repo if needed.
---
.../apache/camel/component/kafka/KafkaConsumer.java | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 5f56270..e6913f3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -35,8 +35,10 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
+import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -55,6 +57,7 @@ public class KafkaConsumer extends DefaultConsumer {
private final Long pollTimeoutMs;
// This list helps working around the infinite loop of KAFKA-1894
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
+ private volatile boolean stopOffsetRepo;
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -105,6 +108,18 @@ public class KafkaConsumer extends DefaultConsumer {
endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError());
super.doStart();
+ // is the offset repository already started?
+ StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+ if (repo instanceof ServiceSupport) {
+ boolean started = ((ServiceSupport) repo).isStarted();
+ // if not already started then we would do that and also stop it
+ if (!started) {
+ stopOffsetRepo = true;
+ log.debug("Starting OffsetRepository: {}", repo);
+ ServiceHelper.startService(endpoint.getConfiguration().getOffsetRepository());
+ }
+ }
+
executor = endpoint.createExecutor();
String topic = endpoint.getConfiguration().getTopic();
@@ -140,6 +155,12 @@ public class KafkaConsumer extends DefaultConsumer {
tasks.clear();
executor = null;
+ if (stopOffsetRepo) {
+ StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+ log.debug("Stopping OffsetRepository: {}", repo);
+ ServiceHelper.stopAndShutdownService(repo);
+ }
+
super.doStop();
}