You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/09 06:51:16 UTC

[camel] 01/03: CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a366f69fde4959546d00823e7eea6d3aa0492971
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 9 07:47:26 2021 +0100

    CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs
---
 .../org/apache/camel/catalog/components/kafka.json         |  2 +-
 .../org/apache/camel/catalog/docs/kafka-component.adoc     |  2 +-
 .../camel/component/kafka/KafkaComponentConfigurer.java    |  5 +++++
 .../resources/org/apache/camel/component/kafka/kafka.json  |  2 +-
 components/camel-kafka/src/main/docs/kafka-component.adoc  |  2 +-
 .../camel/component/kafka/DefaultKafkaClientFactory.java   | 12 ++++++++++++
 .../apache/camel/component/kafka/KafkaClientFactory.java   | 10 ++++++++++
 .../org/apache/camel/component/kafka/KafkaComponent.java   |  8 ++------
 .../org/apache/camel/component/kafka/KafkaConsumer.java    | 14 +++-----------
 .../org/apache/camel/component/kafka/KafkaProducer.java    |  7 +++----
 .../apache/camel/component/kafka/KafkaConsumerTest.java    |  6 +++++-
 .../component/dsl/KafkaComponentBuilderFactory.java        |  4 +---
 docs/components/modules/ROOT/pages/kafka-component.adoc    |  2 +-
 13 files changed, 46 insertions(+), 30 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 79709da..55a1240 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -92,7 +92,7 @@
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after  [...]
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
-    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
     "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
     "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
     "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 1e2481e..e44e7a1 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -225,6 +225,11 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
     }
 
     @Override
+    public String[] getAutowiredNames() {
+        return new String[]{"kafkaClientFactory"};
+    }
+
+    @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "additionalproperties":
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 79709da..55a1240 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -92,7 +92,7 @@
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after  [...]
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
-    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+    "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
     "synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
     "schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
     "interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index ee024bf..425788e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -18,10 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
+import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
 public class DefaultKafkaClientFactory implements KafkaClientFactory {
+
     @Override
     public KafkaProducer getProducer(Properties kafkaProps) {
         return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
@@ -31,4 +33,14 @@ public class DefaultKafkaClientFactory implements KafkaClientFactory {
     public KafkaConsumer getConsumer(Properties kafkaProps) {
         return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
     }
+
+    @Override
+    public String getBrokers(KafkaConfiguration configuration) {
+        // broker urls is mandatory in this implementation
+        String brokers = configuration.getBrokers();
+        if (ObjectHelper.isEmpty(brokers)) {
+            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        }
+        return brokers;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index 4ab066f..d7de678 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -41,4 +41,14 @@ public interface KafkaClientFactory {
      * @return            an instance of Kafka consumer.
      */
     KafkaConsumer getConsumer(Properties kafkaProps);
+
+    /**
+     * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
+     * or a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
+     *
+     * @param configuration the configuration
+     */
+    String getBrokers(KafkaConfiguration configuration);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index e0c0732..205f7a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,7 +35,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     private boolean useGlobalSslContextParameters;
     @Metadata(label = "consumer,advanced")
     private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
-    @Metadata(label = "advanced")
+    @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
 
     public KafkaComponent() {
@@ -119,11 +119,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     /**
      * Factory to use for creating {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
      * {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This allows to configure a custom factory to
-     * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
-     * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with logic that extends the vanilla Kafka
-     * clients.
-     *
-     * @param kafkaClientFactory factory instance to use.
+     * create instances with logic that extends the vanilla Kafka clients.
      */
     public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) {
         this.kafkaClientFactory = kafkaClientFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index eae650e..b911881 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -41,7 +41,6 @@ import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +70,6 @@ public class KafkaConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
         this.processor = processor;
         this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
-
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (ObjectHelper.isEmpty(brokers)) {
-            throw new IllegalArgumentException("Brokers must be configured");
-        }
     }
 
     @Override
@@ -87,13 +81,11 @@ public class KafkaConsumer extends DefaultConsumer {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        if (brokers != null) {
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-
         if (endpoint.getConfiguration().getGroupId() != null) {
             String groupId = endpoint.getConfiguration().getGroupId();
             props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2cf9a34..a8ef383 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,11 +70,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = endpoint.getConfiguration().getBrokers();
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+        String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+        if (brokers != null) {
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
         }
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 
         return props;
     }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index aa6b6f5..e0a397d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -20,12 +20,14 @@ import org.apache.camel.Processor;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class KafkaConsumerTest {
 
     private KafkaConfiguration configuration = mock(KafkaConfiguration.class);
+    private KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
     private KafkaComponent component = mock(KafkaComponent.class);
     private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
     private Processor processor = mock(Processor.class);
@@ -35,8 +37,10 @@ public class KafkaConsumerTest {
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(configuration);
         when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
+        when(component.getKafkaClientFactory()).thenReturn(clientFactory);
+        when(clientFactory.getBrokers(any())).thenReturn(null);
         assertThrows(IllegalArgumentException.class,
-                () -> new KafkaConsumer(endpoint, processor));
+                () -> new KafkaConsumer(endpoint, processor).getProps());
     }
 
     @Test
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 748b63d..9a8c878 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -1374,9 +1374,7 @@ public interface KafkaComponentBuilderFactory {
          * Factory to use for creating
          * org.apache.kafka.clients.consumer.KafkaConsumer and
          * org.apache.kafka.clients.producer.KafkaProducer instances. This
-         * allows to configure a custom factory to create
-         * org.apache.kafka.clients.consumer.KafkaConsumer and
-         * org.apache.kafka.clients.producer.KafkaProducer instances with logic
+         * allows to configure a custom factory to create instances with logic
          * that extends the vanilla Kafka clients.
          * 
          * The option is a:
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc
index 8c91b52..c74f74c 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -120,7 +120,7 @@ The Kafka component supports 99 options, which are listed below.
 | *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
 | *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. |  | KafkaClientFactory
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
 | *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) |  | String
 | *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime |  | String