You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/19 18:18:13 UTC

[camel-kafka-connector] branch master updated (1c4d16c -> 3b2c306)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 1c4d16c  Fixed CS
     new d6b7a1b  Added Support for KafkaIdempotentRepository
     new 3b2c306  Fixed CS

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/pom.xml                                       |  4 +++
 .../camel/kafkaconnector/CamelConnectorConfig.java | 14 +++++++++-
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  6 +++++
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 +++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  6 +++++
 .../utils/CamelKafkaConnectMain.java               | 31 +++++++++++++++++++++-
 7 files changed, 67 insertions(+), 4 deletions(-)


[camel-kafka-connector] 02/02: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3b2c30623ca9aa734dba1d512e93fb35c6d68cf6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 18:38:35 2020 +0100

    Fixed CS
---
 .../org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 8f95655..c926bbc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -203,8 +203,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                         idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
                         break;
                     case "kafka":
-                	    idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
-                	    break;
+                        idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
+                        break;
                     default:
                         break;
                 }


[camel-kafka-connector] 01/02: Added Support for KafkaIdempotentRepository

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d6b7a1b673b0be7cb94b2446c8a7c9462a8b8bdc
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 14:50:31 2020 +0100

    Added Support for KafkaIdempotentRepository
---
 core/pom.xml                                       |  4 +++
 .../camel/kafkaconnector/CamelConnectorConfig.java | 14 +++++++++-
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  6 +++++
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 +++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  6 +++++
 .../utils/CamelKafkaConnectMain.java               | 31 +++++++++++++++++++++-
 7 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index c7ea38d..825f908 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-languages</artifactId>
         </dependency>
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 0332576..42c9a85 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -49,7 +49,11 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     
     public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
-    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If idempotency must be enabled or not";
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT = "memory";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF = "camel.idempotency.repository.type";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC = "The idempotent repository type to use, possible values are memory and kafka";
     
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
@@ -63,6 +67,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
     public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
     
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT = "kafka_idempotent_repository";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF = "camel.idempotency.kafka.topic";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC = "The Kafka topic name to use for the idempotent repository";    
+    
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+    public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers";
+    public static final String  CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live"; 
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 299d578..26f9a6f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -61,7 +61,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
     
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 2cc01a2..256de92 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -89,6 +89,9 @@ public class CamelSinkTask extends SinkTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+            final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+            final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -112,6 +115,9 @@ public class CamelSinkTask extends SinkTask {
                 .withExpressionType(expressionType)
                 .withExpressionHeader(expressionHeader)
                 .withMemoryDimension(memoryDimension)
+                .withIdempotentRepositoryType(idempotentRepositoryType)
+                .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+                .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 70e2b09..edc8e41 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -97,7 +97,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
         .define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
-        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+        .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
     
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4e893ab..439984e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,9 @@ public class CamelSourceTask extends SourceTask {
             final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
             final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
             final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+            final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+            final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+            final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
             
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
@@ -121,6 +124,9 @@ public class CamelSourceTask extends SourceTask {
                 .withExpressionType(expressionType)
                 .withExpressionHeader(expressionHeader)
                 .withMemoryDimension(memoryDimension)
+                .withIdempotentRepositoryType(idempotentRepositoryType)
+                .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+                .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index bebe4a9..8f95655 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
@@ -100,6 +101,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String expressionType;
         private String expressionHeader;
         private int memoryDimension;
+        private String idempotentRepositoryType;
+        private String idempotentRepositoryTopicName;
+        private String idempotentRepositoryKafkaServers;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -165,6 +169,21 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.memoryDimension = memoryDimension;
             return this;
         }
+        
+        public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
+            this.idempotentRepositoryType = idempotentRepositoryType;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
+            this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
+            return this;
+        }
+        
+        public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
+            this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -178,7 +197,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
             
             // Instantianting the idempotent Repository here and inject it in registry to be referenced
             if (idempotencyEnabled) {
-                IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+                IdempotentRepository idempotentRepo = null;
+                switch (idempotentRepositoryType) {
+                    case "memory":
+                        idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+                        break;
+                    case "kafka":
+                	    idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
+                	    break;
+                    default:
+                        break;
+                }
                 camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
             }