You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/15 14:53:09 UTC
[camel] branch main updated: CAMEL-17131: avoid duplicate initialization of the Kafka consumer
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cc8b6cb CAMEL-17131: avoid duplicate initialization of the Kafka consumer
cc8b6cb is described below
commit cc8b6cb1afd99415fb7cf10357f2407f129adc27
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 15 13:34:25 2021 +0100
CAMEL-17131: avoid duplicate initialization of the Kafka consumer
---
.../camel/component/kafka/KafkaConsumer.java | 4 --
.../camel/component/kafka/KafkaFetchRecords.java | 44 +++++++++-------------
2 files changed, 18 insertions(+), 30 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ef2e4e9..e3b0c26 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -118,10 +118,6 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
KafkaFetchRecords task = new KafkaFetchRecords(
this, pollExceptionStrategy, bridge, topic, pattern, i + "", getProps());
- // pre-initialize task during startup so if there is any error we
- // have it thrown asap
- task.preInit();
-
executor.submit(task);
tasks.add(task);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index c0119f4..2cc7e59 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -76,33 +76,25 @@ class KafkaFetchRecords implements Runnable {
this.kafkaProps = kafkaProps;
}
- void preInit() {
- createConsumer();
- }
-
@Override
public void run() {
if (!isKafkaConsumerRunnable()) {
return;
}
- if (isRetrying() || isReconnecting()) {
+ do {
try {
- if (isReconnecting()) {
- // re-initialize on re-connect so we have a fresh consumer
- createConsumer();
- }
+ createConsumer();
+
+ initializeConsumer();
} catch (Exception e) {
// ensure this is logged so users can see the problem
LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
+ continue;
}
- long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
- String prefix = isReconnecting() ? "Reconnecting" : "Retrying";
- LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay);
-
- doRun();
- }
+ startPolling();
+ } while (isRetrying() || isReconnecting());
LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
safeUnsubscribe();
@@ -117,6 +109,11 @@ class KafkaFetchRecords implements Runnable {
Thread.currentThread()
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+ // The Kafka consumer should be null at the first try. For every other reconnection event, it will not
+ long delay = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+ final String prefix = this.consumer == null ? "Connecting" : "Reconnecting";
+ LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", prefix, threadId, delay);
+
// this may throw an exception if something is wrong with kafka consumer
this.consumer = kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
} finally {
@@ -124,19 +121,14 @@ class KafkaFetchRecords implements Runnable {
}
}
- protected void doRun() {
- if (isReconnecting()) {
- subscribe();
-
- // set reconnect to false as the connection and resume is done at this point
- setReconnect(false);
+ private void initializeConsumer() {
+ subscribe();
- // set retry to true to continue polling
- setRetry(true);
- }
+ // set reconnect to false as the connection and resume is done at this point
+ setReconnect(false);
- // start polling
- startPolling();
+ // set retry to true to continue polling
+ setRetry(true);
}
private void subscribe() {