You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/02 11:10:36 UTC

[camel] 01/02: CAMEL-17262: camel-kafka - Allow to configure endpoint level KafkaManualCommitFactory

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a88a58a19f461f93dc5f85055ffadf15d8ca30c2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 2 12:08:42 2021 +0100

    CAMEL-17262: camel-kafka - Allow to configure endpoint level KafkaManualCommitFactory
---
 .../component/kafka/KafkaComponentConfigurer.java  |  2 +-
 .../component/kafka/KafkaEndpointConfigurer.java   |  6 ++++
 .../component/kafka/KafkaEndpointUriFactory.java   |  3 +-
 .../org/apache/camel/component/kafka/kafka.json    |  3 +-
 .../camel/component/kafka/KafkaComponent.java      |  7 ++--
 .../camel/component/kafka/KafkaEndpoint.java       | 18 ++++++++++
 .../camel/component/kafka/KafkaFetchRecords.java   |  2 +-
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 40 ++++++++++++++++++++++
 8 files changed, 75 insertions(+), 6 deletions(-)

diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 3353bdd..1d9b659 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -236,7 +236,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
 
     @Override
     public String[] getAutowiredNames() {
-        return new String[]{"kafkaClientFactory","pollExceptionStrategy"};
+        return new String[]{"kafkaClientFactory","kafkaManualCommitFactory","pollExceptionStrategy"};
     }
 
     @Override
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 6b22e6a..f7e64e2 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -84,6 +84,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "interceptorClasses": target.getConfiguration().setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": target.getConfiguration().setKerberosBeforeReloginMinTime(property(camelContext, java.lang.Integer.class, value)); return true;
         case "kerberosinitcmd":
@@ -288,6 +290,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "interceptorClasses": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class;
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": return java.lang.Integer.class;
         case "kerberosinitcmd":
@@ -493,6 +497,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "interceptorClasses": return target.getConfiguration().getInterceptorClasses();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
+        case "kafkamanualcommitfactory":
+        case "kafkaManualCommitFactory": return target.getKafkaManualCommitFactory();
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": return target.getConfiguration().getKerberosBeforeReloginMinTime();
         case "kerberosinitcmd":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 4aa78a8..34e3b57 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(102);
+        Set<String> props = new HashSet<>(103);
         props.add("synchronous");
         props.add("queueBufferingMaxMessages");
         props.add("allowManualCommit");
@@ -85,6 +85,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
         props.add("deliveryTimeoutMs");
         props.add("lazyStartProducer");
         props.add("sslKeystorePassword");
+        props.add("kafkaManualCommitFactory");
         props.add("sslEndpointAlgorithm");
         props.add("resumeStrategy");
         props.add("topic");
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 1ca0c51..70b2bf4 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -61,7 +61,7 @@
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
-    "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in ca [...]
+    "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in cas [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...]
@@ -169,6 +169,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in c [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...]
     "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by thi [...]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2e7079b..75e0034 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -33,8 +33,8 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @Metadata(label = "security", defaultValue = "false")
     private boolean useGlobalSslContextParameters;
-    @Metadata(label = "consumer,advanced")
-    private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
+    @Metadata(autowired = true, label = "consumer,advanced")
+    private KafkaManualCommitFactory kafkaManualCommitFactory;
     @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory;
     @Metadata(autowired = true, label = "consumer,advanced")
@@ -154,5 +154,8 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         if (kafkaClientFactory == null) {
             kafkaClientFactory = new DefaultKafkaClientFactory();
         }
+        if (kafkaManualCommitFactory == null) {
+            kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
+        }
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 5f04669..39efe6d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -54,6 +54,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @UriParam(label = "advanced")
     private KafkaClientFactory kafkaClientFactory;
+    @UriParam(label = "consumer,advanced")
+    private KafkaManualCommitFactory kafkaManualCommitFactory;
 
     public KafkaEndpoint() {
     }
@@ -88,6 +90,19 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         this.kafkaClientFactory = kafkaClientFactory;
     }
 
+    public KafkaManualCommitFactory getKafkaManualCommitFactory() {
+        return kafkaManualCommitFactory;
+    }
+
+    /**
+     * Factory to use for creating {@link KafkaManualCommit} instances. This allows to plugin a custom factory to create
+     * custom {@link KafkaManualCommit} instances in case special logic is needed when doing manual commits that
+     * deviates from the default implementation that comes out of the box.
+     */
+    public void setKafkaManualCommitFactory(KafkaManualCommitFactory kafkaManualCommitFactory) {
+        this.kafkaManualCommitFactory = kafkaManualCommitFactory;
+    }
+
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
@@ -95,6 +110,9 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         if (kafkaClientFactory == null) {
             kafkaClientFactory = getComponent().getKafkaClientFactory();
         }
+        if (kafkaManualCommitFactory == null) {
+            kafkaManualCommitFactory = getComponent().getKafkaManualCommitFactory();
+        }
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2a487d1..2ebd8bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -404,7 +404,7 @@ class KafkaFetchRecords implements Runnable {
                 kafkaConsumer.getEndpoint().getConfiguration(),
                 kafkaConsumer.getProcessor(),
                 consumer,
-                kafkaConsumer.getEndpoint().getComponent().getKafkaManualCommitFactory(), threadId, asyncCommits);
+                kafkaConsumer.getEndpoint().getKafkaManualCommitFactory(), threadId, asyncCommits);
     }
 
     private void seekToNextOffset(long partitionLastOffset) {
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index a73de06..e6765e6 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1851,6 +1851,46 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * Factory to use for creating KafkaManualCommit instances. This allows
+         * to plugin a custom factory to create custom KafkaManualCommit
+         * instances in case special logic is needed when doing manual commits
+         * that deviates from the default implementation that comes out of the
+         * box.
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.KafkaManualCommitFactory&lt;/code&gt; type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param kafkaManualCommitFactory the value to set
+         * @return the dsl builder
+         */
+        default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory(
+                Object kafkaManualCommitFactory) {
+            doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory);
+            return this;
+        }
+        /**
+         * Factory to use for creating KafkaManualCommit instances. This allows
+         * to plugin a custom factory to create custom KafkaManualCommit
+         * instances in case special logic is needed when doing manual commits
+         * that deviates from the default implementation that comes out of the
+         * box.
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;org.apache.camel.component.kafka.KafkaManualCommitFactory&lt;/code&gt; type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param kafkaManualCommitFactory the value to set
+         * @return the dsl builder
+         */
+        default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory(
+                String kafkaManualCommitFactory) {
+            doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory);
+            return this;
+        }
+        /**
          * Factory to use for creating
          * org.apache.kafka.clients.consumer.KafkaConsumer and
          * org.apache.kafka.clients.producer.KafkaProducer instances. This