You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/08/16 08:16:35 UTC

[camel] branch master updated: CAMEL-12732: camel-kafka consumer should automatic start/stop offset repo if needed.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 67dbf1f  CAMEL-12732: camel-kafka consumer should automatic start/stop offset repo if needed.
67dbf1f is described below

commit 67dbf1f35a40b4202b005099f2974644046a78ef
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Aug 16 09:36:24 2018 +0200

    CAMEL-12732: camel-kafka consumer should automatic start/stop offset repo if needed.
---
 .../apache/camel/component/kafka/KafkaConsumer.java | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 5f56270..e6913f3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -35,8 +35,10 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -55,6 +57,7 @@ public class KafkaConsumer extends DefaultConsumer {
     private final Long pollTimeoutMs;
     // This list helps working around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
+    private volatile boolean stopOffsetRepo;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -105,6 +108,18 @@ public class KafkaConsumer extends DefaultConsumer {
                 endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError());
         super.doStart();
 
+        // is the offset repository already started?
+        StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+        if (repo instanceof ServiceSupport) {
+            boolean started = ((ServiceSupport) repo).isStarted();
+            // if not already started then we would do that and also stop it
+            if (!started) {
+                stopOffsetRepo = true;
+                log.debug("Starting OffsetRepository: {}", repo);
+                ServiceHelper.startService(endpoint.getConfiguration().getOffsetRepository());
+            }
+        }
+
         executor = endpoint.createExecutor();
 
         String topic = endpoint.getConfiguration().getTopic();
@@ -140,6 +155,12 @@ public class KafkaConsumer extends DefaultConsumer {
         tasks.clear();
         executor = null;
 
+        if (stopOffsetRepo) {
+            StateRepository repo = endpoint.getConfiguration().getOffsetRepository();
+            log.debug("Stopping OffsetRepository: {}", repo);
+            ServiceHelper.stopAndShutdownService(repo);
+        }
+
         super.doStop();
     }