You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/10 07:42:01 UTC

[camel] branch master updated: CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 295d39c  CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself.
295d39c is described below

commit 295d39c153a013274319f9b66bc1e49fc7c89032
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 10 09:40:55 2019 +0200

    CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself.
---
 .../main/java/org/apache/camel/component/kafka/KafkaProducer.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 16ba065..c43d824 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -53,6 +53,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     private final KafkaEndpoint endpoint;
     private ExecutorService workerPool;
     private boolean shutdownWorkerPool;
+    private volatile boolean closeKafkaProducer;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
         super(endpoint);
@@ -106,10 +107,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
             try {
                 // Kafka uses reflection for loading authentication settings, use its classloader
                 Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+                log.trace("Creating KafkaProducer");
                 kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
+                closeKafkaProducer = true;
             } finally {
                 Thread.currentThread().setContextClassLoader(threadClassLoader);
             }
+            log.debug("Created KafkaProducer: {}", kafkaProducer);
         }
 
         // if we are in asynchronous mode we need a worker pool
@@ -122,8 +126,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
     @Override
     protected void doStop() throws Exception {
-        if (kafkaProducer != null) {
+        if (kafkaProducer != null && closeKafkaProducer) {
+            log.debug("Closing KafkaProducer: {}", kafkaProducer);
             kafkaProducer.close();
+            kafkaProducer = null;
         }
 
         if (shutdownWorkerPool && workerPool != null) {