You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/13 11:37:51 UTC

[camel] branch camel-3.18.x updated: camel-kafka: prevent exceptions in close from leaking (CAMEL-18796) (#8885)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 326b396f529 camel-kafka: prevent exceptions in close from leaking (CAMEL-18796) (#8885)
326b396f529 is described below

commit 326b396f529aad47ea8c64f0f9048758bbd5a034
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Dec 13 12:37:45 2022 +0100

    camel-kafka: prevent exceptions in close from leaking (CAMEL-18796) (#8885)
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9a60cce760f..9224ffa129b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -190,7 +190,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         } finally {
             if (consumer != null) {
                 consumer.unsubscribe();
-                consumer.close(Duration.ofSeconds(5));
+                try {
+                    consumer.close(Duration.ofSeconds(5));
+                } catch (Exception e) {
+                    LOG.warn("Error closing the consumer: {} (this error will be ignored)", e.getMessage(), e);
+                }
             }
         }
     }
@@ -373,6 +377,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             IOHelper.close(producer, "Kafka producer", LOG);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.warn("Error closing the Kafka producer: {} (this error will be ignored)", e.getMessage(), e);
         } finally {
             lock.unlock();
         }