You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/09 06:51:15 UTC

[camel] branch master updated (d0082cb -> c71771c)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from d0082cb  CAMEL-16094 camel-milo: connection caching.
     new a366f69  CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs
     new 7c5bf3b  Regen
     new c71771c  CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/catalog/components/kafka.json         |  2 +-
 .../org/apache/camel/catalog/docs/kafka-component.adoc     |  2 +-
 .../camel/component/kafka/KafkaComponentConfigurer.java    |  5 +++++
 .../resources/org/apache/camel/component/kafka/kafka.json  |  2 +-
 components/camel-kafka/src/main/docs/kafka-component.adoc  |  2 +-
 .../camel/component/kafka/DefaultKafkaClientFactory.java   | 12 ++++++++++++
 .../apache/camel/component/kafka/KafkaClientFactory.java   | 10 ++++++++++
 .../org/apache/camel/component/kafka/KafkaComponent.java   |  8 ++------
 .../org/apache/camel/component/kafka/KafkaConsumer.java    | 14 +++-----------
 .../org/apache/camel/component/kafka/KafkaProducer.java    |  7 +++----
 .../apache/camel/component/kafka/KafkaConsumerTest.java    |  6 +++++-
 .../milo/client/MiloClientCachingConnectionManager.java    |  4 ++--
 .../component/dsl/KafkaComponentBuilderFactory.java        |  4 +---
 docs/components/modules/ROOT/pages/kafka-component.adoc    |  2 +-
 14 files changed, 48 insertions(+), 32 deletions(-)


[camel] 03/03: CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c71771cfb06d74b7c326c84532fbe9990d9cf637
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 9 07:50:27 2021 +0100

    CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs
---
 .../test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index e0a397d..eff7f14 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -38,7 +38,7 @@ public class KafkaConsumerTest {
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
         when(component.getKafkaClientFactory()).thenReturn(clientFactory);
-        when(clientFactory.getBrokers(any())).thenReturn(null);
+        when(clientFactory.getBrokers(any())).thenThrow(new IllegalArgumentException());
         assertThrows(IllegalArgumentException.class,
                 () -> new KafkaConsumer(endpoint, processor).getProps());
     }


[camel] 02/03: Regen

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7c5bf3b17c505363ffa523f95c1d62b001b71f2b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 9 07:48:39 2021 +0100

    Regen
---
 .../component/milo/client/MiloClientCachingConnectionManager.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientCachingConnectionManager.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientCachingConnectionManager.java
index c3662a2..e9b4f8c 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientCachingConnectionManager.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientCachingConnectionManager.java
@@ -25,8 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A caching {@link MiloClientConnectionManager} which cache and reuses the same {@link MiloClientConnection}
- * for clients with the same cache id ({@link MiloClientConfiguration#toCacheId()}.
+ * A caching {@link MiloClientConnectionManager} which cache and reuses the same {@link MiloClientConnection} for
+ * clients with the same cache id ({@link MiloClientConfiguration#toCacheId()}.
  */
 public class MiloClientCachingConnectionManager implements MiloClientConnectionManager {
 


[camel] 01/03: CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a366f69fde4959546d00823e7eea6d3aa0492971
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 9 07:47:26 2021 +0100

    CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs
---
 .../org/apache/camel/catalog/components/kafka.json         |  2 +-
 .../org/apache/camel/catalog/docs/kafka-component.adoc     |  2 +-
 .../camel/component/kafka/KafkaComponentConfigurer.java    |  5 +++++
 .../resources/org/apache/camel/component/kafka/kafka.json  |  2 +-
 components/camel-kafka/src/main/docs/kafka-component.adoc  |  2 +-
 .../camel/component/kafka/DefaultKafkaClientFactory.java   | 12 ++++++++++++
 .../apache/camel/component/kafka/KafkaClientFactory.java   | 10 ++++++++++
 .../org/apache/camel/component/kafka/KafkaComponent.java   |  8 ++------
 .../org/apache/camel/component/kafka/KafkaConsumer.java    | 14 +++-----------
 .../org/apache/camel/component/kafka/KafkaProducer.java    |  7 +++----
 .../apache/camel/component/kafka/KafkaConsumerTest.java    |  6 +++++-
 .../component/dsl/KafkaComponentBuilderFactory.java        |  4 +---
 docs/components/modules/ROOT/pages/kafka-component.adoc    |  2 +-
 13 files changed, 46 insertions(+), 30 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 79709da..55a1240 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -92,7 +92,7 @@
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after  [...]
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
-    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
     "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
     "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
     "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 1e2481e..e44e7a1 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -225,6 +225,11 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
     }
 
     @Override
+    public String[] getAutowiredNames() {
+        return new String[]{"kafkaClientFactory"};
+    }
+
+    @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "additionalproperties":
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 79709da..55a1240 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -92,7 +92,7 @@
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after  [...]
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
-    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
     "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
     "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
     "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index ee024bf..425788e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -18,10 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
+import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
 public class DefaultKafkaClientFactory implements KafkaClientFactory {
+
     @Override
     public KafkaProducer getProducer(Properties kafkaProps) {
         return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
@@ -31,4 +33,14 @@ public class DefaultKafkaClientFactory implements KafkaClientFactory {
     public KafkaConsumer getConsumer(Properties kafkaProps) {
         return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
     }
+
+    @Override
+    public String getBrokers(KafkaConfiguration configuration) {
+        // broker urls is mandatory in this implementation
+        String brokers = configuration.getBrokers();
+        if (ObjectHelper.isEmpty(brokers)) {
+            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        }
+        return brokers;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index 4ab066f..d7de678 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -41,4 +41,14 @@ public interface KafkaClientFactory {
      * @return            an instance of Kafka consumer.
      */
     KafkaConsumer getConsumer(Properties kafkaProps);
+
+    /**
+     * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
+     * or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
+     *
+     * @param configuration the configuration
+     */
+    String getBrokers(KafkaConfiguration configuration);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index e0c0732..205f7a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,7 +35,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     private boolean useGlobalSslContextParameters;
     @Metadata(label = "consumer,advanced")
     private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
-    @Metadata(label = "advanced")
+    @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
 
     public KafkaComponent() {
@@ -119,11 +119,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     /**
      * Factory to use for creating {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
      * {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This allows to configure a custom factory to
-     * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
-     * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with logic that extends the vanilla Kafka
-     * clients.
-     *
-     * @param kafkaClientFactory factory instance to use.
+     * create instances with logic that extends the vanilla Kafka clients.
      */
     public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) {
         this.kafkaClientFactory = kafkaClientFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index eae650e..b911881 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -41,7 +41,6 @@ import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +70,6 @@ public class KafkaConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
         this.processor = processor;
         this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
-
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (ObjectHelper.isEmpty(brokers)) {
-            throw new IllegalArgumentException("Brokers must be configured");
-        }
     }
 
     @Override
@@ -87,13 +81,11 @@ public class KafkaConsumer extends DefaultConsumer {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        if (brokers != null) {
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-
         if (endpoint.getConfiguration().getGroupId() != null) {
             String groupId = endpoint.getConfiguration().getGroupId();
             props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2cf9a34..a8ef383 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,11 +70,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        if (brokers != null) {
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 
         return props;
     }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index aa6b6f5..e0a397d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -20,12 +20,14 @@ import org.apache.camel.Processor;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class KafkaConsumerTest {
 
     private KafkaConfiguration configuration = mock(KafkaConfiguration.class);
+    private KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
     private KafkaComponent component = mock(KafkaComponent.class);
     private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
     private Processor processor = mock(Processor.class);
@@ -35,8 +37,10 @@ public class KafkaConsumerTest {
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
+        when(component.getKafkaClientFactory()).thenReturn(clientFactory);
+        when(clientFactory.getBrokers(any())).thenReturn(null);
         assertThrows(IllegalArgumentException.class,
-                () -> new KafkaConsumer(endpoint, processor));
+                () -> new KafkaConsumer(endpoint, processor).getProps());
     }
 
     @Test
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 748b63d..9a8c878 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -1374,9 +1374,7 @@ public interface KafkaComponentBuilderFactory {
          * Factory to use for creating
          * org.apache.kafka.clients.consumer.KafkaConsumer and
          * org.apache.kafka.clients.producer.KafkaProducer instances. This
-         * allows to configure a custom factory to create
-         * org.apache.kafka.clients.consumer.KafkaConsumer and
-         * org.apache.kafka.clients.producer.KafkaProducer instances with logic
+         * allows to configure a custom factory to create instances with logic
          * that extends the vanilla Kafka clients.
          * 
          * The option is a:
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc
index 8c91b52..c74f74c 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -120,7 +120,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String