You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/02 11:10:36 UTC
[camel] 01/02: CAMEL-17262: camel-kafka - Allow to configure endpoint level KafkaManualCommitFactory
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a88a58a19f461f93dc5f85055ffadf15d8ca30c2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 2 12:08:42 2021 +0100
CAMEL-17262: camel-kafka - Allow to configure endpoint level KafkaManualCommitFactory
---
.../component/kafka/KafkaComponentConfigurer.java | 2 +-
.../component/kafka/KafkaEndpointConfigurer.java | 6 ++++
.../component/kafka/KafkaEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/kafka/kafka.json | 3 +-
.../camel/component/kafka/KafkaComponent.java | 7 ++--
.../camel/component/kafka/KafkaEndpoint.java | 18 ++++++++++
.../camel/component/kafka/KafkaFetchRecords.java | 2 +-
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 40 ++++++++++++++++++++++
8 files changed, 75 insertions(+), 6 deletions(-)
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 3353bdd..1d9b659 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -236,7 +236,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
@Override
public String[] getAutowiredNames() {
- return new String[]{"kafkaClientFactory","pollExceptionStrategy"};
+ return new String[]{"kafkaClientFactory","kafkaManualCommitFactory","pollExceptionStrategy"};
}
@Override
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 6b22e6a..f7e64e2 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -84,6 +84,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
case "interceptorClasses": target.getConfiguration().setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
case "kafkaclientfactory":
case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
+ case "kafkamanualcommitfactory":
+ case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true;
case "kerberosbeforereloginmintime":
case "kerberosBeforeReloginMinTime": target.getConfiguration().setKerberosBeforeReloginMinTime(property(camelContext, java.lang.Integer.class, value)); return true;
case "kerberosinitcmd":
@@ -288,6 +290,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
case "interceptorClasses": return java.lang.String.class;
case "kafkaclientfactory":
case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class;
+ case "kafkamanualcommitfactory":
+ case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
case "kerberosbeforereloginmintime":
case "kerberosBeforeReloginMinTime": return java.lang.Integer.class;
case "kerberosinitcmd":
@@ -493,6 +497,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
case "interceptorClasses": return target.getConfiguration().getInterceptorClasses();
case "kafkaclientfactory":
case "kafkaClientFactory": return target.getKafkaClientFactory();
+ case "kafkamanualcommitfactory":
+ case "kafkaManualCommitFactory": return target.getKafkaManualCommitFactory();
case "kerberosbeforereloginmintime":
case "kerberosBeforeReloginMinTime": return target.getConfiguration().getKerberosBeforeReloginMinTime();
case "kerberosinitcmd":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 4aa78a8..34e3b57 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(102);
+ Set<String> props = new HashSet<>(103);
props.add("synchronous");
props.add("queueBufferingMaxMessages");
props.add("allowManualCommit");
@@ -85,6 +85,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
props.add("deliveryTimeoutMs");
props.add("lazyStartProducer");
props.add("sslKeystorePassword");
+ props.add("kafkaManualCommitFactory");
props.add("sslEndpointAlgorithm");
props.add("resumeStrategy");
props.add("topic");
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 1ca0c51..70b2bf4 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -61,7 +61,7 @@
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...]
"topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...]
"valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
- "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in ca [...]
+ "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in cas [...]
"pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
"bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...]
"compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...]
@@ -169,6 +169,7 @@
"valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...]
"exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+ "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in c [...]
"bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...]
"compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...]
"connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by thi [...]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2e7079b..75e0034 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -33,8 +33,8 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
private KafkaConfiguration configuration = new KafkaConfiguration();
@Metadata(label = "security", defaultValue = "false")
private boolean useGlobalSslContextParameters;
- @Metadata(label = "consumer,advanced")
- private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
+ @Metadata(autowired = true, label = "consumer,advanced")
+ private KafkaManualCommitFactory kafkaManualCommitFactory;
@Metadata(autowired = true, label = "advanced")
private KafkaClientFactory kafkaClientFactory;
@Metadata(autowired = true, label = "consumer,advanced")
@@ -154,5 +154,8 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
if (kafkaClientFactory == null) {
kafkaClientFactory = new DefaultKafkaClientFactory();
}
+ if (kafkaManualCommitFactory == null) {
+ kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
+ }
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5f04669..39efe6d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -54,6 +54,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
private KafkaConfiguration configuration = new KafkaConfiguration();
@UriParam(label = "advanced")
private KafkaClientFactory kafkaClientFactory;
+ @UriParam(label = "consumer,advanced")
+ private KafkaManualCommitFactory kafkaManualCommitFactory;
public KafkaEndpoint() {
}
@@ -88,6 +90,19 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
this.kafkaClientFactory = kafkaClientFactory;
}
+ public KafkaManualCommitFactory getKafkaManualCommitFactory() {
+ return kafkaManualCommitFactory;
+ }
+
+ /**
+ * Factory to use for creating {@link KafkaManualCommit} instances. This allows to plugin a custom factory to create
+ * custom {@link KafkaManualCommit} instances in case special logic is needed when doing manual commits that
+ * deviates from the default implementation that comes out of the box.
+ */
+ public void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory) {
+ this.kafkaManualCommitFactory = kafkaManualCommitFactory;
+ }
+
@Override
protected void doBuild() throws Exception {
super.doBuild();
@@ -95,6 +110,9 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
if (kafkaClientFactory == null) {
kafkaClientFactory = getComponent().getKafkaClientFactory();
}
+ if (kafkaManualCommitFactory == null) {
+ kafkaManualCommitFactory = getComponent().getKafkaManualCommitFactory();
+ }
}
@Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2a487d1..2ebd8bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -404,7 +404,7 @@ class KafkaFetchRecords implements Runnable {
kafkaConsumer.getEndpoint().getConfiguration(),
kafkaConsumer.getProcessor(),
consumer,
- kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory(), threadId, asyncCommits);
+ kafkaConsumer.getEndpoint().getKafkaManualCommitFactory(), threadId, asyncCommits);
}
private void seekToNextOffset(long partitionLastOffset) {
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index a73de06..e6765e6 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1851,6 +1851,46 @@ public interface KafkaEndpointBuilderFactory {
return this;
}
/**
+ * Factory to use for creating KafkaManualCommit instances. This allows
+ * to plugin a custom factory to create custom KafkaManualCommit
+ * instances in case special logic is needed when doing manual commits
+ * that deviates from the default implementation that comes out of the
+ * box.
+ *
+ * The option is a:
+ * <code>org.apache.camel.component.kafka.KafkaManualCommitFactory</code> type.
+ *
+ * Group: consumer (advanced)
+ *
+ * @param kafkaManualCommitFactory the value to set
+ * @return the dsl builder
+ */
+ default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory(
+ Object kafkaManualCommitFactory) {
+ doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory);
+ return this;
+ }
+ /**
+ * Factory to use for creating KafkaManualCommit instances. This allows
+ * to plugin a custom factory to create custom KafkaManualCommit
+ * instances in case special logic is needed when doing manual commits
+ * that deviates from the default implementation that comes out of the
+ * box.
+ *
+ * The option will be converted to a
+ * <code>org.apache.camel.component.kafka.KafkaManualCommitFactory</code> type.
+ *
+ * Group: consumer (advanced)
+ *
+ * @param kafkaManualCommitFactory the value to set
+ * @return the dsl builder
+ */
+ default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory(
+ String kafkaManualCommitFactory) {
+ doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory);
+ return this;
+ }
+ /**
* Factory to use for creating
* org.apache.kafka.clients.consumer.KafkaConsumer and
* org.apache.kafka.clients.producer.KafkaProducer instances. This