You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/11 19:17:42 UTC

[camel] branch main updated: CAMEL-17913: camel-kafka - Add isolationLevel option for consumer

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ee230f90a3 CAMEL-17913: camel-kafka - Add isolationLevel option for consumer
0ee230f90a3 is described below

commit 0ee230f90a3dec7dcc44a93bc4ad8223d405c7c2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed May 11 21:17:33 2022 +0200

    CAMEL-17913: camel-kafka - Add isolationLevel option for consumer
---
 .../org/apache/camel/catalog/components/kafka.json |  2 ++
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++++++
 .../component/kafka/KafkaEndpointConfigurer.java   |  6 ++++++
 .../component/kafka/KafkaEndpointUriFactory.java   |  3 ++-
 .../org/apache/camel/component/kafka/kafka.json    |  2 ++
 .../camel/component/kafka/KafkaConfiguration.java  | 24 ++++++++++++++++++++--
 6 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index b7e0f5bb104..29422a48c6c 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -61,6 +61,7 @@
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
     "createConsumerBackoffInterval": { "kind": "property", "displayName": "Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to create the kafka consumer (kafka-client)." },
     "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": "Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum attempts to create the kafka consumer (kafka-client), before eventually giving up and failing. Error during creating the consumer may be fatal due to invalid configuration an [...]
+    "isolationLevel": { "kind": "property", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Control [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instanc [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
     "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": "Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to subscribe to the kafka broker." },
@@ -184,6 +185,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Contro [...]
     "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit insta [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...]
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index a9e413a5361..3be52693578 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -90,6 +90,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": getOrCreateConfiguration(target).setHeartbeatIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true;
         case "interceptorclasses":
         case "interceptorClasses": getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
+        case "isolationlevel":
+        case "isolationLevel": getOrCreateConfiguration(target).setIsolationLevel(property(camelContext, java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
@@ -308,6 +310,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return java.lang.Integer.class;
         case "interceptorclasses":
         case "interceptorClasses": return java.lang.String.class;
+        case "isolationlevel":
+        case "isolationLevel": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
@@ -522,6 +526,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "heartbeatIntervalMs": return getOrCreateConfiguration(target).getHeartbeatIntervalMs();
         case "interceptorclasses":
         case "interceptorClasses": return getOrCreateConfiguration(target).getInterceptorClasses();
+        case "isolationlevel":
+        case "isolationLevel": return getOrCreateConfiguration(target).getIsolationLevel();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
         case "kafkamanualcommitfactory":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index be6375442c4..bba6d9d9c94 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -80,6 +80,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": target.getConfiguration().setHeartbeatIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true;
         case "interceptorclasses":
         case "interceptorClasses": target.getConfiguration().setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
+        case "isolationlevel":
+        case "isolationLevel": target.getConfiguration().setIsolationLevel(property(camelContext, java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
@@ -282,6 +284,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": return java.lang.Integer.class;
         case "interceptorclasses":
         case "interceptorClasses": return java.lang.String.class;
+        case "isolationlevel":
+        case "isolationLevel": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
@@ -485,6 +489,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "heartbeatIntervalMs": return target.getConfiguration().getHeartbeatIntervalMs();
         case "interceptorclasses":
         case "interceptorClasses": return target.getConfiguration().getInterceptorClasses();
+        case "isolationlevel":
+        case "isolationLevel": return target.getConfiguration().getIsolationLevel();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
         case "kafkamanualcommitfactory":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 6bbcedab179..8e6b17ec99a 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(101);
+        Set<String> props = new HashSet<>(102);
         props.add("additionalProperties");
         props.add("allowManualCommit");
         props.add("autoCommitEnable");
@@ -52,6 +52,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
         props.add("headerSerializer");
         props.add("heartbeatIntervalMs");
         props.add("interceptorClasses");
+        props.add("isolationLevel");
         props.add("kafkaClientFactory");
         props.add("kafkaManualCommitFactory");
         props.add("kerberosBeforeReloginMinTime");
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index b7e0f5bb104..29422a48c6c 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -61,6 +61,7 @@
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
     "createConsumerBackoffInterval": { "kind": "property", "displayName": "Create Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to create the kafka consumer (kafka-client)." },
     "createConsumerBackoffMaxAttempts": { "kind": "property", "displayName": "Create Consumer Backoff Max Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum attempts to create the kafka consumer (kafka-client), before eventually giving up and failing. Error during creating the consumer may be fatal due to invalid configuration an [...]
+    "isolationLevel": { "kind": "property", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Control [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instanc [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
     "subscribeConsumerBackoffInterval": { "kind": "property", "displayName": "Subscribe Consumer Backoff Interval", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The delay in millis seconds to wait before trying again to subscribe to the kafka broker." },
@@ -184,6 +185,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "isolationLevel": { "kind": "parameter", "displayName": "Isolation Level", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "read_uncommitted", "read_committed" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "read_uncommitted", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Contro [...]
     "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit insta [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index df624832871..7211084f567 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -145,6 +145,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     private PollOnError pollOnError = PollOnError.ERROR_HANDLER;
     @UriParam(label = "consumer", defaultValue = "5000", javaType = "java.time.Duration")
     private Long commitTimeoutMs = 5000L;
+    @UriParam(label = "consumer,advanced", defaultValue = "read_uncommitted", enums = "read_uncommitted,read_committed")
+    private String isolationLevel;
 
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -466,6 +468,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
         addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotEmpty(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
         addPropertyIfNotEmpty(props, ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
+        addPropertyIfNotEmpty(props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, getIsolationLevel());
         addPropertyIfNotEmpty(props, "schema.registry.url", getSchemaRegistryURL());
         addPropertyIfNotFalse(props, "specific.avro.reader", isSpecificAvroReader());
 
@@ -1804,10 +1807,27 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
 
     /**
      * The maximum time, in milliseconds, that the code will wait for a synchronous commit to complete
-     * 
-     * @param commitTimeoutMs
      */
     public void setCommitTimeoutMs(Long commitTimeoutMs) {
         this.commitTimeoutMs = commitTimeoutMs;
     }
+
+    public String getIsolationLevel() {
+        return isolationLevel;
+    }
+
+    /**
+     * Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return
+     * transactional messages which have been committed. If set to read_uncommitted (the default), consumer.poll() will
+     * return all messages, even transactional messages which have been aborted. Non-transactional messages will be
+     * returned unconditionally in either mode. Messages will always be returned in offset order. Hence, in
+     * read_committed mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the
+     * one less than the offset of the first open transaction. In particular any messages appearing after messages
+     * belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a
+     * result, read_committed</code> consumers will not be able to read up to the high watermark when there are in
+     * flight transactions. Further, when in read_committed the seekToEnd method will return the LSO
+     */
+    public void setIsolationLevel(String isolationLevel) {
+        this.isolationLevel = isolationLevel;
+    }
 }