You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 08:19:28 UTC

[camel-kafka-connector] 06/18: Idempotency: Lets build the idempotentRepository before and use a reference to it in registry

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 741873aa2977274446d8d29a6983dd57eef6d790
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 19 07:24:02 2020 +0100

    Idempotency: Lets build the idempotentRepository before and use a reference to it in registry
---
 .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index acc97ca..1627a8d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -28,6 +28,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -174,6 +175,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
             camelMain.setInitialProperties(camelProperties);
+            
+            // Instantianting the idempotent Repository here and inject it in registry to be referenced
+            if (idempotencyEnabled) {
+            	IdempotentRepository idempotentRepo = MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension);
+            	camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+            }
 
             //creating the actual route
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
@@ -213,14 +220,14 @@ public class CamelKafkaConnectMain extends SimpleMain {
                                     LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
                                            + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
-                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 case "header":
                                     LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
                                            + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
                                     LOG.info(".to({})", to);
                                     rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                        .idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                        .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 default:
                                     break;
@@ -235,11 +242,11 @@ public class CamelKafkaConnectMain extends SimpleMain {
                             switch (expressionType) {
                                 case "body":
                                     LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    rd.idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 case "header":
                                     LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    rd.idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(memoryDimension)).toD(to);
+                                    rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
                                     break;
                                 default:
                                     break;