You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/09 06:51:16 UTC
[camel] 01/03: CAMEL-16138: Allow KafkaClientFactory to be used
without explicit broker URLs
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a366f69fde4959546d00823e7eea6d3aa0492971
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 9 07:47:26 2021 +0100
CAMEL-16138: Allow KafkaClientFactory to be used without explicit broker URLs
---
.../org/apache/camel/catalog/components/kafka.json | 2 +-
.../org/apache/camel/catalog/docs/kafka-component.adoc | 2 +-
.../camel/component/kafka/KafkaComponentConfigurer.java | 5 +++++
.../resources/org/apache/camel/component/kafka/kafka.json | 2 +-
components/camel-kafka/src/main/docs/kafka-component.adoc | 2 +-
.../camel/component/kafka/DefaultKafkaClientFactory.java | 12 ++++++++++++
.../apache/camel/component/kafka/KafkaClientFactory.java | 10 ++++++++++
.../org/apache/camel/component/kafka/KafkaComponent.java | 8 ++------
.../org/apache/camel/component/kafka/KafkaConsumer.java | 14 +++-----------
.../org/apache/camel/component/kafka/KafkaProducer.java | 7 +++----
.../apache/camel/component/kafka/KafkaConsumerTest.java | 6 +++++-
.../component/dsl/KafkaComponentBuilderFactory.java | 4 +---
docs/components/modules/ROOT/pages/kafka-component.adoc | 2 +-
13 files changed, 46 insertions(+), 30 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 79709da..55a1240 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -92,7 +92,7 @@
"workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...]
"workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
"autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
- "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+ "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
"synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
"schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
"interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
| *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
| *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
| *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
| *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String
| *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 1e2481e..e44e7a1 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -225,6 +225,11 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
}
@Override
+ public String[] getAutowiredNames() {
+ return new String[]{"kafkaClientFactory"};
+ }
+
+ @Override
public Class<?> getOptionType(String name, boolean ignoreCase) {
switch (ignoreCase ? name.toLowerCase() : name) {
case "additionalproperties":
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 79709da..55a1240 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -92,7 +92,7 @@
"workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...]
"workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Maximum number of threads for the worker pool for continue routing Exchange after [...]
"autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
- "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to c [...]
+ "kafkaClientFactory": { "kind": "property", "displayName": "Kafka Client Factory", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaClientFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to cr [...]
"synchronous": { "kind": "property", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether synchronous processing should be strictly used" },
"schemaRegistryURL": { "kind": "property", "displayName": "Schema Registry URL", "group": "confluent", "label": "confluent", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. Thi [...]
"interceptorClasses": { "kind": "property", "displayName": "Interceptor Classes", "group": "monitoring", "label": "common,monitoring", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets interceptors for producer or consumers. Producer interceptors have to be classes implemen [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 85c4147..cfaf481 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -118,7 +118,7 @@ The Kafka component supports 99 options, which are listed below.
| *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
| *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
| *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
| *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String
| *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index ee024bf..425788e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -18,10 +18,12 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
+import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
public class DefaultKafkaClientFactory implements KafkaClientFactory {
+
@Override
public KafkaProducer getProducer(Properties kafkaProps) {
return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
@@ -31,4 +33,14 @@ public class DefaultKafkaClientFactory implements KafkaClientFactory {
public KafkaConsumer getConsumer(Properties kafkaProps) {
return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
}
+
+ @Override
+ public String getBrokers(KafkaConfiguration configuration) {
+ // broker urls is mandatory in this implementation
+ String brokers = configuration.getBrokers();
+ if (ObjectHelper.isEmpty(brokers)) {
+ throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+ }
+ return brokers;
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index 4ab066f..d7de678 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -41,4 +41,14 @@ public interface KafkaClientFactory {
* @return an instance of Kafka consumer.
*/
KafkaConsumer getConsumer(Properties kafkaProps);
+
+ /**
+ * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
+ * or a VIP pointing to a subset of brokers.
+ * <p/>
+ * This option is known as <tt>bootstrap.servers</tt> in the Kafka documentation.
+ *
+ * @param configuration the configuration
+ */
+ String getBrokers(KafkaConfiguration configuration);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index e0c0732..205f7a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,7 +35,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
private boolean useGlobalSslContextParameters;
@Metadata(label = "consumer,advanced")
private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
- @Metadata(label = "advanced")
+ @Metadata(autowired = true, label = "advanced")
private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
public KafkaComponent() {
@@ -119,11 +119,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
/**
* Factory to use for creating {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
* {@link org.apache.kafka.clients.producer.KafkaProducer} instances. This allows to configure a custom factory to
- * create {@link org.apache.kafka.clients.consumer.KafkaConsumer} and
- * {@link org.apache.kafka.clients.producer.KafkaProducer} instances with logic that extends the vanilla Kafka
- * clients.
- *
- * @param kafkaClientFactory factory instance to use.
+ * create instances with logic that extends the vanilla Kafka clients.
*/
public void setKafkaClientFactory(KafkaClientFactory kafkaClientFactory) {
this.kafkaClientFactory = kafkaClientFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index eae650e..b911881 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -41,7 +41,6 @@ import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +70,6 @@ public class KafkaConsumer extends DefaultConsumer {
this.endpoint = endpoint;
this.processor = processor;
this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
-
- String brokers = endpoint.getConfiguration().getBrokers();
- if (ObjectHelper.isEmpty(brokers)) {
- throw new IllegalArgumentException("Brokers must be configured");
- }
}
@Override
@@ -87,13 +81,11 @@ public class KafkaConsumer extends DefaultConsumer {
Properties props = endpoint.getConfiguration().createConsumerProperties();
endpoint.updateClassProperties(props);
- String brokers = endpoint.getConfiguration().getBrokers();
- if (brokers == null) {
- throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+ String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+ if (brokers != null) {
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
}
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-
if (endpoint.getConfiguration().getGroupId() != null) {
String groupId = endpoint.getConfiguration().getGroupId();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2cf9a34..a8ef383 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,11 +70,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
Properties props = endpoint.getConfiguration().createProducerProperties();
endpoint.updateClassProperties(props);
- String brokers = endpoint.getConfiguration().getBrokers();
- if (brokers == null) {
- throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
+ String brokers = endpoint.getComponent().getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
+ if (brokers != null) {
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
}
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return props;
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index aa6b6f5..e0a397d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -20,12 +20,14 @@ import org.apache.camel.Processor;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class KafkaConsumerTest {
private KafkaConfiguration configuration = mock(KafkaConfiguration.class);
+ private KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
private KafkaComponent component = mock(KafkaComponent.class);
private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
private Processor processor = mock(Processor.class);
@@ -35,8 +37,10 @@ public class KafkaConsumerTest {
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(configuration);
when(endpoint.getConfiguration().getGroupId()).thenReturn("groupOne");
+ when(component.getKafkaClientFactory()).thenReturn(clientFactory);
+ when(clientFactory.getBrokers(any())).thenReturn(null);
assertThrows(IllegalArgumentException.class,
- () -> new KafkaConsumer(endpoint, processor));
+ () -> new KafkaConsumer(endpoint, processor).getProps());
}
@Test
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 748b63d..9a8c878 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -1374,9 +1374,7 @@ public interface KafkaComponentBuilderFactory {
* Factory to use for creating
* org.apache.kafka.clients.consumer.KafkaConsumer and
* org.apache.kafka.clients.producer.KafkaProducer instances. This
- * allows to configure a custom factory to create
- * org.apache.kafka.clients.consumer.KafkaConsumer and
- * org.apache.kafka.clients.producer.KafkaProducer instances with logic
+ * allows to configure a custom factory to create instances with logic
* that extends the vanilla Kafka clients.
*
* The option is a:
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc
index 8c91b52..c74f74c 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -120,7 +120,7 @@ The Kafka component supports 99 options, which are listed below.
| *workerPoolCoreSize* (producer) | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 10 | Integer
| *workerPoolMaxSize* (producer) | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | 20 | Integer
| *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
-| *kafkaClientFactory* (advanced) | Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
+| *kafkaClientFactory* (advanced) | *Autowired* Factory to use for creating org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.clients.producer.KafkaProducer instances. This allows to configure a custom factory to create instances with logic that extends the vanilla Kafka clients. | | KafkaClientFactory
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used | false | boolean
| *schemaRegistryURL* (confluent) | URL of the Confluent Platform schema registry servers to use. The format is host1:port1,host2:port2. This is known as schema.registry.url in the Confluent Platform documentation. This option is only available in the Confluent Platform (not standard Apache Kafka) | | String
| *interceptorClasses* (monitoring) | Sets interceptors for producer or consumers. Producer interceptors have to be classes implementing org.apache.kafka.clients.producer.ProducerInterceptor Consumer interceptors have to be classes implementing org.apache.kafka.clients.consumer.ConsumerInterceptor Note that if you use Producer interceptor on a consumer it will throw a class cast exception in runtime | | String