You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/08/04 12:57:51 UTC

[camel] branch camel-3.11.x updated: CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.11.x by this push:
     new 88b20d4  CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925)
88b20d4 is described below

commit 88b20d439b25ba57ae24b81ef6b8d3659618a757
Author: jenskordowski <10...@users.noreply.github.com>
AuthorDate: Wed Aug 4 14:56:53 2021 +0200

    CAMEL-16832: camel-kafka - file descriptor leak contd. (#5925)
    
    Co-authored-by: Jens Kordowski <je...@sap.com>
---
 .../main/java/org/apache/camel/component/kafka/KafkaConsumer.java    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 05a734b..331ba9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -240,6 +240,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 first = false;
 
                 if (!isRunAllowed() || isStoppingOrStopped() || isSuspendingOrSuspended()) {
+                    LOG.debug("Closing consumer {}", threadId);
+                    IOHelper.close(consumer);
                     return;
                 }
 
@@ -277,6 +279,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 doReconnectRun();
                 // set reconnect to false as its done now
                 reconnect.set(false);
+                // set retry to true to continue polling
+                retry.set(true);
             }
             // polling
             doPollRun(retry, reconnect);
@@ -502,6 +506,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         }
                         // re-connect so the consumer can try the same message again
                         reconnect.set(true);
+                        retry.set(false); // to close the current consumer
                     } else if (PollOnError.ERROR_HANDLER == onError) {
                         // use bridge error handler to route with exception
                         bridge.handleException(e);