You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 06:48:27 UTC
[camel-kafka-connector] 08/17: Added Support for
KafkaIdempotentRepository
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-master-align-and-rebase
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit ad4fdfd27779304af1de30306f186e5bf1fa8e17
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 14:50:31 2020 +0100
Added Support for KafkaIdempotentRepository
---
core/pom.xml | 4 +++
.../camel/kafkaconnector/CamelConnectorConfig.java | 14 +++++++++-
.../kafkaconnector/CamelSinkConnectorConfig.java | 5 +++-
.../apache/camel/kafkaconnector/CamelSinkTask.java | 6 +++++
.../kafkaconnector/CamelSourceConnectorConfig.java | 5 +++-
.../camel/kafkaconnector/CamelSourceTask.java | 6 +++++
.../utils/CamelKafkaConnectMain.java | 31 +++++++++++++++++++++-
7 files changed, 67 insertions(+), 4 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index c7ea38d..825f908 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-core-languages</artifactId>
</dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index 0332576..42c9a85 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -49,7 +49,11 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final Boolean CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT = false;
public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF = "camel.idempotency.enabled";
- public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If in memory idempotency must be enabled or not";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC = "If idempotency must be enabled or not";
+
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT = "memory";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF = "camel.idempotency.repository.type";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC = "The idempotent repository type to use, possible values are memory and kafka";
public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT = "body";
public static final String CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF = "camel.idempotency.expression.type";
@@ -63,6 +67,14 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF = "camel.idempotency.memory.dimension";
public static final String CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC = "The Memory dimension of the in memory idempotent Repository";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT = "kafka_idempotent_repository";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF = "camel.idempotency.kafka.topic";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC = "The Kafka topic name to use for the idempotent repository";
+
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF = "camel.idempotency.kafka.bootstrap.servers";
+ public static final String CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC = "A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live";
+
protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
super(definition, originals, configProviderProps, doLog);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 299d578..26f9a6f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -61,7 +61,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
- .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 2cc01a2..256de92 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -89,6 +89,9 @@ public class CamelSinkTask extends SinkTask {
final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+ final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+ final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+ final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
@@ -112,6 +115,9 @@ public class CamelSinkTask extends SinkTask {
.withExpressionType(expressionType)
.withExpressionHeader(expressionHeader)
.withMemoryDimension(memoryDimension)
+ .withIdempotentRepositoryType(idempotentRepositoryType)
+ .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+ .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
.build(camelContext);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 70e2b09..edc8e41 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -97,7 +97,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_DOC)
.define(CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_DOC)
- .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC);
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_DOC)
+ .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC);
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4e893ab..439984e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -95,6 +95,9 @@ public class CamelSourceTask extends SourceTask {
final String expressionType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
final String expressionHeader = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
final int memoryDimension = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
+ final String idempotentRepositoryType = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
+ final String idempotentRepositoryKafkaTopic = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
+ final String idempotentRepositoryBootstrapServers = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
@@ -121,6 +124,9 @@ public class CamelSourceTask extends SourceTask {
.withExpressionType(expressionType)
.withExpressionHeader(expressionHeader)
.withMemoryDimension(memoryDimension)
+ .withIdempotentRepositoryType(idempotentRepositoryType)
+ .withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
+ .withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
.build(camelContext);
consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index bebe4a9..8f95655 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.support.service.ServiceHelper;
@@ -100,6 +101,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
private String expressionType;
private String expressionHeader;
private int memoryDimension;
+ private String idempotentRepositoryType;
+ private String idempotentRepositoryTopicName;
+ private String idempotentRepositoryKafkaServers;
public Builder(String from, String to) {
this.from = from;
@@ -165,6 +169,21 @@ public class CamelKafkaConnectMain extends SimpleMain {
this.memoryDimension = memoryDimension;
return this;
}
+
+ public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
+ this.idempotentRepositoryType = idempotentRepositoryType;
+ return this;
+ }
+
+ public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
+ this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
+ return this;
+ }
+
+ public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
+ this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
+ return this;
+ }
public CamelKafkaConnectMain build(CamelContext camelContext) {
CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -178,7 +197,17 @@ public class CamelKafkaConnectMain extends SimpleMain {
// Instantianting the idempotent Repository here and inject it in registry to be referenced
if (idempotencyEnabled) {
- IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+ IdempotentRepository idempotentRepo = null;
+ switch (idempotentRepositoryType) {
+ case "memory":
+ idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+ break;
+ case "kafka":
+ idempotentRepo = new KafkaIdempotentRepository(idempotentRepositoryTopicName, idempotentRepositoryKafkaServers);
+ break;
+ default:
+ break;
+ }
camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
}