You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/15 14:53:09 UTC

[camel] branch main updated: CAMEL-17131: avoid duplicate initialization of the Kafka consumer

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new cc8b6cb  CAMEL-17131: avoid duplicate initialization of the Kafka consumer
cc8b6cb is described below

commit cc8b6cb1afd99415fb7cf10357f2407f129adc27
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 15 13:34:25 2021 +0100

    CAMEL-17131: avoid duplicate initialization of the Kafka consumer
---
 .../camel/component/kafka/KafkaConsumer.java       |  4 --
 .../camel/component/kafka/KafkaFetchRecords.java   | 44 +++++++++-------------
 2 files changed, 18 insertions(+), 30 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ef2e4e9..e3b0c26 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -118,10 +118,6 @@ public class KafkaConsumer extends DefaultConsumer {
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(
                     this, pollExceptionStrategy, bridge, topic, pattern, i + "", getProps());
-            // pre-initialize task during startup so if there is any error we
-            // have it thrown asap
-            task.preInit();
-
             executor.submit(task);
 
             tasks.add(task);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index c0119f4..2cc7e59 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -76,33 +76,25 @@ class KafkaFetchRecords implements Runnable {
         this.kafkaProps = kafkaProps;
     }
 
-    void preInit() {
-        createConsumer();
-    }
-
     @Override
     public void run() {
         if (!isKafkaConsumerRunnable()) {
             return;
         }
 
-        if (isRetrying() || isReconnecting()) {
+        do {
             try {
-                if (isReconnecting()) {
-                    // re-initialize on re-connect so we have a fresh consumer
-                    createConsumer();
-                }
+                createConsumer();
+
+                initializeConsumer();
             } catch (Exception e) {
                 // ensure this is logged so users can see the problem
                 LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
+                continue;
             }
 
-            long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-            String prefix = isReconnecting() ? "Reconnecting" : "Retrying";
-            LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay);
-
-            doRun();
-        }
+            startPolling();
+        } while (isRetrying() || isReconnecting());
 
         LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
         safeUnsubscribe();
@@ -117,6 +109,11 @@ class KafkaFetchRecords implements Runnable {
             Thread.currentThread()
                     .setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
 
+            // The Kafka consumer should be null at the first try. For every other reconnection event, it will not
+            long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+            final String prefix = this.consumer == null ? "Connecting" : "Reconnecting";
+            LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", prefix, threadId, delay);
+
             // this may throw an exception if something is wrong with kafka consumer
             this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
         } finally {
@@ -124,19 +121,14 @@ class KafkaFetchRecords implements Runnable {
         }
     }
 
-    protected void doRun() {
-        if (isReconnecting()) {
-            subscribe();
-
-            // set reconnect to false as the connection and resume is done at this point
-            setReconnect(false);
+    private void initializeConsumer() {
+        subscribe();
 
-            // set retry to true to continue polling
-            setRetry(true);
-        }
+        // set reconnect to false as the connection and resume is done at this point
+        setReconnect(false);
 
-        // start polling
-        startPolling();
+        // set retry to true to continue polling
+        setRetry(true);
     }
 
     private void subscribe() {