You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 08:19:28 UTC
[camel-kafka-connector] 06/18: Idempotency: Lets build the
idempotentRepository before and use a reference to it in registry
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 741873aa2977274446d8d29a6983dd57eef6d790
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 07:24:02 2020 +0100
Idempotency: Lets build the idempotentRepository before and use a reference to it in registry
---
.../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index acc97ca..1627a8d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
@@ -174,6 +175,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
camelMain.setInitialProperties(camelProperties);
+
+ // Instantianting the idempotent Repository here and inject it in registry to be referenced
+ if (idempotencyEnabled) {
+ IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+ camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+ }
//creating the actual route
camelMain.configure().addRoutesBuilder(new RouteBuilder() {
@@ -213,14 +220,14 @@ public class CamelKafkaConnectMain extends SimpleMain {
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
+ "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
LOG.info(".to({})", to);
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
break;
case "header":
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
+ "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
LOG.info(".to({})", to);
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
- .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
break;
default:
break;
@@ -235,11 +242,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
switch (expressionType) {
case "body":
LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
- rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
break;
case "header":
LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
- rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+ rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
break;
default:
break;