You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/10 07:42:01 UTC
[camel] branch master updated: CAMEL-13630: Added a bit of logging
and only close kafka producer if its created by Camel itself.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 295d39c CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself.
295d39c is described below
commit 295d39c153a013274319f9b66bc1e49fc7c89032
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 10 09:40:55 2019 +0200
CAMEL-13630: Added a bit of logging and only close kafka producer if its created by Camel itself.
---
.../main/java/org/apache/camel/component/kafka/KafkaProducer.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 16ba065..c43d824 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -53,6 +53,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
private final KafkaEndpoint endpoint;
private ExecutorService workerPool;
private boolean shutdownWorkerPool;
+ private volatile boolean closeKafkaProducer;
public KafkaProducer(KafkaEndpoint endpoint) {
super(endpoint);
@@ -106,10 +107,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
try {
// Kafka uses reflection for loading authentication settings, use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+ log.trace("Creating KafkaProducer");
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
+ closeKafkaProducer = true;
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
+ log.debug("Created KafkaProducer: {}", kafkaProducer);
}
// if we are in asynchronous mode we need a worker pool
@@ -122,8 +126,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
@Override
protected void doStop() throws Exception {
- if (kafkaProducer != null) {
+ if (kafkaProducer != null && closeKafkaProducer) {
+ log.debug("Closing KafkaProducer: {}", kafkaProducer);
kafkaProducer.close();
+ kafkaProducer = null;
}
if (shutdownWorkerPool && workerPool != null) {