You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/07/26 14:13:32 UTC
[camel] 01/01: CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch CAMEL-19650/3.20
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 367b98b0e635b002b5ab3a95f5a0a28ce1aaab6c
Author: Kartik kalaghatgi <ka...@gmail.com>
AuthorDate: Wed Jul 26 19:35:10 2023 +0530
CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration (#10827)
* CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
Camel kafka component when provided with custom worker pool is ignored and still creates a new pool, this is fixed with check added to use custom worker pool if provided.
* CAMEL-19650: Camel Kafka doesn't honor 'workerPool' configuration
Fixing checkstyle issue
---
.../java/org/apache/camel/component/kafka/KafkaProducer.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 95afbde2960..d1dcaa30bbe 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -169,9 +169,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
// if we are in asynchronous mode we need a worker pool
if (!configuration.isSynchronous() && workerPool == null) {
- workerPool = endpoint.createProducerExecutor();
- // we create a thread pool so we should also shut it down
- shutdownWorkerPool = true;
+ // If custom worker pool is provided, then use it, else create a new one.
+ if (configuration.getWorkerPool() != null) {
+ workerPool = configuration.getWorkerPool();
+ shutdownWorkerPool = false;
+ } else {
+ workerPool = endpoint.createProducerExecutor();
+ // we create a thread pool so we should also shut it down
+ shutdownWorkerPool = true;
+ }
}
// init client id which we may need to get from the kafka producer via reflection