You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/07/26 14:05:17 UTC

[camel] branch camel-3.x updated: CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new e158a82b347 CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)
e158a82b347 is described below

commit e158a82b34747ae4d0bae213c5b4d169ad8c24da
Author: Kartik  kalaghatgi <ka...@gmail.com>
AuthorDate: Wed Jul 26 19:35:10 2023 +0530

    CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)
    
    * CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
    
    Camel kafka component when provided with custom worker pool is ignored and still creates a new pool, this is fixed with check added to use custom worker pool if provided.
    
    * CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
    
    Fixing checkstyle issue
---
 .../java/org/apache/camel/component/kafka/KafkaProducer.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 95afbde2960..d1dcaa30bbe 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -169,9 +169,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         // if we are in asynchronous mode we need a worker pool
         if (!configuration.isSynchronous() && workerPool == null) {
-            workerPool = endpoint.createProducerExecutor();
-            // we create a thread pool so we should also shut it down
-            shutdownWorkerPool = true;
+            // If custom worker pool is provided, then use it, else create a new one.
+            if (configuration.getWorkerPool() != null) {
+                workerPool = configuration.getWorkerPool();
+                shutdownWorkerPool = false;
+            } else {
+                workerPool = endpoint.createProducerExecutor();
+                // we create a thread pool so we should also shut it down
+                shutdownWorkerPool = true;
+            }
         }
 
         // init client id which we may need to get from the kafka producer via reflection