You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:44 UTC

[camel-kafka-connector] 01/05: Related to #423 : caonverted source and sink to use camel-kamelets.

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8324995d00da2f9687e4d25733e2f5233d574a8d
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Apr 26 01:38:42 2021 +0200

    Related to #423 : caonverted source and sink to use camel-kamelets.
---
 core/pom.xml                                       |  10 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   5 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |   5 +-
 .../utils/CamelKafkaConnectMain.java               | 272 ++++++++++++---------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  10 +-
 .../camel/kafkaconnector/DataFormatTest.java       |  10 +-
 6 files changed, 191 insertions(+), 121 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index f6c014f..3138163 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -53,12 +53,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-kafka</artifactId>
+            <artifactId>camel-kamelet</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-languages</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-xml-jaxb</artifactId>
+        </dependency>
 
         <!-- Tools -->
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..4e5a201 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelSinkTask extends SinkTask {
+    public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink.";
+
     public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -119,8 +121,9 @@ public class CamelSinkTask extends SinkTask {
                                                 CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 00ce145..77ce636 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 
 public class CamelSourceTask extends SourceTask {
+    public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource.";
     public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
     public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
 
@@ -145,8 +146,9 @@ public class CamelSourceTask extends SourceTask {
                                                 config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
+            actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);
 
-            cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+            cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
                 .withProperties(actualProps)
                 .withUnmarshallDataFormat(unmarshaller)
                 .withMarshallDataFormat(marshaller)
@@ -171,6 +173,7 @@ public class CamelSourceTask extends SourceTask {
             consumer.start();
 
             cms.start();
+
             LOG.info("CamelSourceTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel context", e);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d031b20..6e7dbdf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,10 +26,16 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.CamelSourceTask;
 import org.apache.camel.main.SimpleMain;
-import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -40,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelKafkaConnectMain extends SimpleMain {
-    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
     protected volatile ConsumerTemplate consumerTemplate;
@@ -140,67 +145,67 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.aggregationTimeout = aggregationTimeout;
             return this;
         }
-        
+
         public Builder withErrorHandler(String errorHandler) {
             this.errorHandler = errorHandler;
             return this;
         }
-        
+
         public Builder withMaxRedeliveries(int maxRedeliveries) {
             this.maxRedeliveries = maxRedeliveries;
             return this;
         }
-        
+
         public Builder withRedeliveryDelay(long redeliveryDelay) {
             this.redeliveryDelay = redeliveryDelay;
             return this;
         }
-        
+
         public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
             this.idempotencyEnabled = idempotencyEnabled;
             return this;
         }
-        
+
         public Builder withExpressionType(String expressionType) {
             this.expressionType = expressionType;
             return this;
         }
-        
+
         public Builder withExpressionHeader(String expressionHeader) {
             this.expressionHeader = expressionHeader;
             return this;
         }
-        
+
         public Builder withMemoryDimension(int memoryDimension) {
             this.memoryDimension = memoryDimension;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
             this.idempotentRepositoryType = idempotentRepositoryType;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
             this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
             this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) {
             this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize;
             return this;
         }
-        
+
         public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) {
             this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration;
             return this;
         }
-        
+
         public Builder withHeadersExcludePattern(String headersExcludePattern) {
             this.headersExcludePattern = headersExcludePattern;
             return this;
@@ -214,21 +219,51 @@ public class CamelKafkaConnectMain extends SimpleMain {
             return entry.getKey() + "=" + entry.getValue();
         }
 
-
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
             camelMain.configure().setAutoConfigurationLogSummary(false);
+            //TODO: make it configurable
+            camelMain.configure().setDumpRoutes(true);
 
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//            //dataformats
+//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+//            }
+//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+//            }
 
-            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
-            camelMain.setInitialProperties(camelProperties);
-            
-            // Instantianting the idempotent Repository here and inject it in registry to be referenced
+            //aggregator
+            if (!ObjectHelper.isEmpty(aggregationSize)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+            }
+            if (!ObjectHelper.isEmpty(aggregationTimeout)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+            }
+
+            //idempotency
             if (idempotencyEnabled) {
+                switch (expressionType) {
+                    case "body":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        break;
+                    case "header":
+                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        break;
+                    default:
+                        break;
+                }
+                // Instantiating the idempotent Repository here and inject it in registry to be referenced
                 IdempotentRepository idempotentRepo = null;
                 switch (idempotentRepositoryType) {
                     case "memory":
@@ -240,110 +275,123 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     default:
                         break;
                 }
-                camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+                camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
+            }
+
+            //remove headers
+            if (!ObjectHelper.isEmpty(headersExcludePattern)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+            }
+
+            List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+            LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
+            camelMain.setInitialProperties(camelProperties);
+
+            //error handler
+            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+            if (errorHandler != null) {
+                switch (errorHandler) {
+                    case "no":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+                        break;
+                    case "default":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                        break;
+                    default:
+                        break;
+                }
             }
 
-            //creating the actual route
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                 public void configure() {
-                    //from
-                    RouteDefinition rd = from(from);
-                    LOG.info("Creating Camel route from({})", from);
-                    
-                    if (!ObjectHelper.isEmpty(errorHandler)) {
-                        switch (errorHandler) {
-                            case "no":
-                                rd.errorHandler(noErrorHandler());
-                                break;
-                            case "default":
-                                rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-                                break;
-                            default:
-                                break;
-                        }
+
+                    //creating source template
+                    RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
+                            .templateParameter("fromUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+                    }
+                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                        rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
                     }
 
-                    //dataformats
+                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
+                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
+                        rdInTemplateSource = rdInTemplateSource.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
+                    }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+                            .to("kamelet:sink");
+
+                    //creating sink template
+                    RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+                            .templateParameter("toUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+//                            .templateParameter("marshall", "dummyDataformat")
+//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+                            .templateParameter("aggregationSize", "1")
+                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+                            .templateParameter("idempotentExpression", "dummyExpression")
+                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+                            .templateParameter("headersExcludePattern", "(?!)");
+
+
+                    ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        LOG.info(".marshal({})", marshallDataFormat);
-                        rd.marshal(marshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        LOG.info(".unmarshal({})", unmarshallDataFormat);
-                        rd.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
                     }
+
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        //aggregation
                         AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
-                                           + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
-                                    LOG.info(".to({})", to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
-                                            .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
-                            } else {
-                                rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
-                    } else {
-                        if (idempotencyEnabled) {
-                            switch (expressionType) {
-                                case "body":
-                                    LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                case "header":
-                                    LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
-                                    if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
-                                    } else {
-                                        rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
-                                    }
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            //to
-                            LOG.info(".to({})", to);
-                            if (ObjectHelper.isEmpty(headersExcludePattern)) {
-                                rd.toD(to);
-                            } else {
-                                rd.removeHeaders(headersExcludePattern).toD(to);
-                            }
-                        }
+                        rdInTemplateSink = rdInTemplateSink.aggregate(s)
+                                .constant(true)
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}");
                     }
+
+                    if (idempotencyEnabled) {
+                        rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                    }
+
+                    rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
+                            .to("{{toUrl}}");
+
+                    //creating the actual route
+                    from(from).toD(to);
                 }
             });
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 5c99ad0..b1271ac 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSourceTaskTest {
-
     private static final String DIRECT_URI = "direct:start";
     private static final String TOPIC_NAME = "my-topic";
 
@@ -225,7 +224,7 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() {
+    public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2");
@@ -236,7 +235,8 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
+
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
@@ -261,10 +261,10 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
 
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
-                .filter(e -> e.getEndpointUri().startsWith("direct"))
+                .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("end"));
                     assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 6715843..c3d26a4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.kafkaconnector;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
@@ -24,6 +26,7 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,8 +69,13 @@ public class DataFormatTest {
         props.put("camel.sink.marshal", "missingDataformat");
 
         CamelSinkTask camelsinkTask = new CamelSinkTask();
-        assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
+        camelsinkTask.start(props);
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
+        records.add(record);
+        assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
         // No need to check the stop method. The error is already thrown/caught during startup.
+        camelsinkTask.stop();
     }
 
     @Test