You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/12/01 19:24:12 UTC

[camel-kafka-connector] branch master updated (cd9356d -> 3d9e8ee)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from cd9356d  Regen
     new b120dab  Support Poll timeout and key cache size as option for kafka idempotent repository
     new 3d9e8ee  Support Poll timeout and key cache size as option for kafka idempotent repository

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/kafkaconnector/CamelConnectorConfig.java  |  8 ++++++++
 .../camel/kafkaconnector/CamelSinkConnectorConfig.java     |  4 +++-
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java     |  4 ++++
 .../camel/kafkaconnector/CamelSourceConnectorConfig.java   |  4 +++-
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java   |  4 ++++
 .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java  | 14 +++++++++++++-
 6 files changed, 35 insertions(+), 3 deletions(-)


[camel-kafka-connector] 02/02: Support Poll timeout and key cache size as option for kafka idempotent repository

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3d9e8ee4054727449443a7efe0eb7a2b9330a272
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Dec 1 18:15:44 2020 +0100

    Support Poll timeout and key cache size as option for kafka idempotent repository
---
 .../java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 3d922fb..bf766b7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -65,7 +65,6 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
     


[camel-kafka-connector] 01/02: Support Poll timeout and key cache size as option for kafka idempotent repository

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b120dabac70faa2f0b6dca0d6c399e432ed8673e
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Dec 1 18:08:39 2020 +0100

    Support Poll timeout and key cache size as option for kafka idempotent repository
---
 .../apache/camel/kafkaconnector/CamelConnectorConfig.java  |  8 ++++++++
 .../camel/kafkaconnector/CamelSinkConnectorConfig.java     |  5 ++++-
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java     |  4 ++++
 .../camel/kafkaconnector/CamelSourceConnectorConfig.java   |  4 +++-
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java   |  4 ++++
 .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java  | 14 +++++++++++++-
 6 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 42c9a85..1b5e014 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -75,6 +75,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers";
     public static final String  CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live"; 
     
+    public static final int CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT = 1000;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF = "camel.idempotency.kafka.max.cache.size";
+    public static final String  CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC = "Sets the maximum size of the local key cache";
+    
+    public static final int CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT = 100;
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF = "camel.idempotency.kafka.poll.duration.ms";
+    public static final String  CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC = "Sets the poll duration (in milliseconds) of the Kafka consumer";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 26f9a6f..3d922fb 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -64,7 +64,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 21b6aa4..e4c9341 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -92,6 +92,8 @@ public class CamelSinkTask extends SinkTask {
             final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
             final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
             final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
+            final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
+            final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -118,6 +120,8 @@ public class CamelSinkTask extends SinkTask {
                 .withIdempotentRepositoryType(idempotentRepositoryType)
                 .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
                 .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
+                .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
+                .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index edc8e41..5d93e50 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -100,7 +100,9 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index f196c25..7848af4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -98,6 +98,8 @@ public class CamelSourceTask extends SourceTask {
             final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
             final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
             final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
+            final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
+            final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
             
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -127,6 +129,8 @@ public class CamelSourceTask extends SourceTask {
                 .withIdempotentRepositoryType(idempotentRepositoryType)
                 .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
                 .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
+                .withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
+                .withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index c926bbc..982d7d7 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -104,6 +104,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String idempotentRepositoryType;
         private String idempotentRepositoryTopicName;
         private String idempotentRepositoryKafkaServers;
+        private int idempotentRepositoryKafkaMaxCacheSize;
+        private int idempotentRepositoryKafkaPollDuration;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -184,6 +186,16 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
             return this;
         }
+        
+        public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) {
+            this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) {
+            this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -203,7 +215,7 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
                         break;
                     case "kafka":
-                        idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
+                        idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers, idempotentRepositoryKafkaMaxCacheSize, idempotentRepositoryKafkaPollDuration);
                         break;
                     default:
                         break;