You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/07/26 14:13:32 UTC

[camel] 01/01: CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-19650/3.20
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 367b98b0e635b002b5ab3a95f5a0a28ce1aaab6c
Author: Kartik  kalaghatgi <ka...@gmail.com>
AuthorDate: Wed Jul 26 19:35:10 2023 +0530

    CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)
    
    * CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
    
    Camel kafka component when provided with custom worker pool is ignored and still creates a new pool, this is fixed with check added to use custom worker pool if provided.
    
    * CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
    
    Fixing checkstyle issue
---
 .../java/org/apache/camel/component/kafka/KafkaProducer.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 95afbde2960..d1dcaa30bbe 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -169,9 +169,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         // if we are in asynchronous mode we need a worker pool
         if (!configuration.isSynchronous() && workerPool == null) {
-            workerPool = endpoint.createProducerExecutor();
-            // we create a thread pool so we should also shut it down
-            shutdownWorkerPool = true;
+            // If custom worker pool is provided, then use it, else create a new one.
+            if (configuration.getWorkerPool() != null) {
+                workerPool = configuration.getWorkerPool();
+                shutdownWorkerPool = false;
+            } else {
+                workerPool = endpoint.createProducerExecutor();
+                // we create a thread pool so we should also shut it down
+                shutdownWorkerPool = true;
+            }
         }
 
         // init client id which we may need to get from the kafka producer via reflection