You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/29 14:26:04 UTC
[camel] branch main updated: CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a1cd467 CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka
a1cd467 is described below
commit a1cd4677f38cd73ee3bebd2f1011efa821fada1a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 29 14:08:26 2021 +0100
CAMEL-17245: prevent bogus commit calls when shutting down camel-kafka
---
.../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index ec51f5d..61fb686 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -99,7 +99,7 @@ class KafkaFetchRecords implements Runnable {
}
startPolling();
- } while (isRetrying() || isReconnect());
+ } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
safeUnsubscribe();
@@ -166,7 +166,7 @@ class KafkaFetchRecords implements Runnable {
KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor();
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
- while (isKafkaConsumerRunnable() && isRetrying() && !isReconnect()) {
+ while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
processAsyncCommits();
@@ -174,7 +174,7 @@ class KafkaFetchRecords implements Runnable {
partitionLastOffset = processPolledRecords(allRecords, kafkaRecordProcessor);
}
- if (!isReconnect()) {
+ if (!isConnected()) {
LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
commit();
}