You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/29 14:26:04 UTC

[camel] branch main updated: CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a1cd467  CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka
a1cd467 is described below

commit a1cd4677f38cd73ee3bebd2f1011efa821fada1a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 29 14:08:26 2021 +0100

    CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index ec51f5d..61fb686 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -99,7 +99,7 @@ class KafkaFetchRecords implements Runnable {
             }
 
             startPolling();
-        } while (isRetrying() || isReconnect());
+        } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
 
         LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
         safeUnsubscribe();
@@ -166,7 +166,7 @@ class KafkaFetchRecords implements Runnable {
             KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor();
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-            while (isKafkaConsumerRunnable() && isRetrying() && !isReconnect()) {
+            while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) {
                 ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
 
                 processAsyncCommits();
@@ -174,7 +174,7 @@ class KafkaFetchRecords implements Runnable {
                 partitionLastOffset = processPolledRecords(allRecords, kafkaRecordProcessor);
             }
 
-            if (!isReconnect()) {
+            if (!isConnected()) {
                 LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                 commit();
             }