You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/08/04 12:57:17 UTC
[camel] branch main updated: CAMEL-16832: camel-kafka - file
descriptor leak contd. (#5925)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 34442f1 CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925)
34442f1 is described below
commit 34442f13e7c8e5367643eb47a1f6034de57e45fa
Author: jenskordowski <10...@users.noreply.github.com>
AuthorDate: Wed Aug 4 14:56:53 2021 +0200
CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925)
Co-authored-by: Jens Kordowski <je...@sap.com>
---
.../main/java/org/apache/camel/component/kafka/KafkaConsumer.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 05a734b..331ba9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -240,6 +240,8 @@ public class KafkaConsumer extends DefaultConsumer {
first = false;
if (!isRunAllowed() || isStoppingOrStopped() || isSuspendingOrSuspended()) {
+ LOG.debug("Closing consumer {}", threadId);
+ IOHelper.close(consumer);
return;
}
@@ -277,6 +279,8 @@ public class KafkaConsumer extends DefaultConsumer {
doReconnectRun();
// set reconnect to false as its done now
reconnect.set(false);
+ // set retry to true to continue polling
+ retry.set(true);
}
// polling
doPollRun(retry, reconnect);
@@ -502,6 +506,7 @@ public class KafkaConsumer extends DefaultConsumer {
}
// re-connect so the consumer can try the same message again
reconnect.set(true);
+ retry.set(false); // to close the current consumer
} else if (PollOnError.ERROR_HANDLER == onError) {
// use bridge error handler to route with exception
bridge.handleException(e);