You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/19 18:18:14 UTC

[camel-kafka-connector] 01/02: Added Support for KafkaIdempotentRepository

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d6b7a1b673b0be7cb94b2446c8a7c9462a8b8bdc
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 14:50:31 2020 +0100

    Added Support for KafkaIdempotentRepository
---
 core/pom.xml                                       |  4 +++
 .../camel/kafkaconnector/CamelConnectorConfig.java | 14 +++++++++-
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  6 +++++
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 +++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  6 +++++
 .../utils/CamelKafkaConnectMain.java               | 31 +++++++++++++++++++++-
 7 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index c7ea38d..825f908 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-languages</artifactId>
         </dependency>
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 0332576..42c9a85 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -49,7 +49,11 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     
     public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
-    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If idempotency must be enabled or not";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT = "memory";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF = "camel.idempotency.repository.type";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC = "The idempotent repository type to use, possible values are memory and kafka";
     
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
@@ -63,6 +67,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
     
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT = "kafka_idempotent_repository";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF = "camel.idempotency.kafka.topic";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC = "The Kafka topic name to use for the idempotent repository";    
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers";
+    public static final String  CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live"; 
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 299d578..26f9a6f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -61,7 +61,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 2cc01a2..256de92 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -89,6 +89,9 @@ public class CamelSinkTask extends SinkTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+            final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+            final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -112,6 +115,9 @@ public class CamelSinkTask extends SinkTask {
                 .withExpressionType(expressionType)
                 .withExpressionHeader(expressionHeader)
                 .withMemoryDimension(memoryDimension)
+                .withIdempotentRepositoryType(idempotentRepositoryType)
+                .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+                .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 70e2b09..edc8e41 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -97,7 +97,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4e893ab..439984e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,9 @@ public class CamelSourceTask extends SourceTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+            final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+            final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -121,6 +124,9 @@ public class CamelSourceTask extends SourceTask {
                 .withExpressionType(expressionType)
                 .withExpressionHeader(expressionHeader)
                 .withMemoryDimension(memoryDimension)
+                .withIdempotentRepositoryType(idempotentRepositoryType)
+                .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+                .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index bebe4a9..8f95655 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
@@ -100,6 +101,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String expressionType;
         private String expressionHeader;
         private int memoryDimension;
+        private String idempotentRepositoryType;
+        private String idempotentRepositoryTopicName;
+        private String idempotentRepositoryKafkaServers;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -165,6 +169,21 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.memoryDimension = memoryDimension;
             return this;
         }
+        
+        public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
+            this.idempotentRepositoryType = idempotentRepositoryType;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
+            this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
+            this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -178,7 +197,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
             
             // Instantianting the idempotent Repository here and inject it in registry to be referenced
             if (idempotencyEnabled) {
-                IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+                IdempotentRepository idempotentRepo = null;
+                switch (idempotentRepositoryType) {
+                    case "memory":
+                        idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+                        break;
+                    case "kafka":
+                	    idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
+                	    break;
+                    default:
+                        break;
+                }
                 camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
             }