You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/08/22 10:37:56 UTC

[camel-kafka-connector] 05/06: Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8bbd57027cc708e73a9d5b7f2007f66b9ee1f201
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Jul 26 11:34:35 2021 +0200

    Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog
---
 .../utils/CamelKafkaConnectMain.java               | 172 ++++++++++-----------
 .../camel/kafkaconnector/DataFormatTest.java       |   9 +-
 2 files changed, 80 insertions(+), 101 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 0871307..2e8d3a8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
@@ -31,11 +30,8 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef;
 import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.CamelSourceTask;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -46,6 +42,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelKafkaConnectMain extends SimpleMain {
+    public static final String KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcMarshal.";
+    public static final String KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcUnMarshal.";
+    public static final String KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcAggregator.";
+    public static final String KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcIdempotent.";
+    public static final String KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcRemoveHeader.";
+
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
     protected volatile ConsumerTemplate consumerTemplate;
@@ -228,36 +230,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
+            //error handler
+            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+            if (errorHandler != null) {
+                switch (errorHandler) {
+                    case "no":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+                        break;
+                    case "default":
+                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+                        break;
+                    default:
+                        break;
+                }
+            }
+
             //dataformats
             if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+                camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
             }
             if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+                camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
             }
 
             //aggregator
             if (!ObjectHelper.isEmpty(aggregationSize)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
             }
             if (!ObjectHelper.isEmpty(aggregationTimeout)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+                camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
             }
 
             //idempotency
             if (idempotencyEnabled) {
                 switch (expressionType) {
                     case "body":
-                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
-                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
                         break;
                     case "header":
-                        camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
-                        camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+                        camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
                         break;
                     default:
                         break;
@@ -279,117 +290,92 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             //remove headers
             if (!ObjectHelper.isEmpty(headersExcludePattern)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+                camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
             }
 
+            // log filtered properties and set initial camel properties
             List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
             LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
             camelMain.setInitialProperties(camelProperties);
 
-            //error handler
-            camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
-            if (errorHandler != null) {
-                switch (errorHandler) {
-                    case "no":
-                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
-                        break;
-                    case "default":
-                        camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
-                        break;
-                    default:
-                        break;
-                }
-            }
-
             camelMain.configure().addRoutesBuilder(new RouteBuilder() {
                 public void configure() {
 
-                    //creating source template
-                    RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
-                            .templateParameter("fromUrl")
-                            .templateParameter("errorHandler", "ckcErrorHandler")
-
+                    //create marshal template
+                    routeTemplate("ckcMarshal")
                             .templateParameter("marshal", "dummyDataformat")
+                            .from("kamelet:source")
+                            .marshal("{{marshal}}")
+                            .to("kamelet:sink");
+
+                    //create unmarshal template
+                    routeTemplate("ckcUnMarshal")
                             .templateParameter("unmarshal", "dummyDataformat")
+                            .from("kamelet:source")
+                            .marshal("{{unmarshal}}")
+                            .to("kamelet:sink");
 
-                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+                    //create aggregator template
+                    routeTemplate("ckcAggregator")
+                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
                             .templateParameter("aggregationSize", "1")
                             .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+                            .from("kamelet:source")
+                            .aggregate(constant(true))
+                                .aggregationStrategyRef("{{aggregationStrategy}}")
+                                .completionSize("{{aggregationSize}}")
+                                .completionTimeout("{{aggregationTimeout}}")
+                                .to("kamelet:sink")
+                            .end();
 
+                    //create idempotent template
+                    routeTemplate("ckcIdempotent")
                             .templateParameter("idempotentExpression", "dummyExpression")
                             .templateParameter("idempotentRepository", "ckcIdempotentRepository")
-                            .templateParameter("headersExcludePattern", "(?!)");
-
-
-                    ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
-                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
-                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
-                    }
-                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
-                    }
-
-                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        rdInTemplateSource = rdInTemplateSource.aggregate(s)
-                                .constant(true)
-                                .completionSize("{{aggregationSize}}")
-                                .completionTimeout("{{aggregationTimeout}}");
-                    }
+                            .from("kamelet:source")
+                            .idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}")
+                            .to("kamelet:sink");
 
-                    if (idempotencyEnabled) {
-                        rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
-                    }
+                    //create removeHeader template
+                    routeTemplate("ckcRemoveHeader")
+                            .templateParameter("headersExcludePattern", "(?!)")
+                            .from("kamelet:source")
+                            .removeHeaders("{{headersExcludePattern}}")
+                            .to("kamelet:sink");
 
-                    rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+                    //creating source template
+                    routeTemplate("ckcSource")
+                            .templateParameter("fromUrl")
+                            .templateParameter("errorHandler", "ckcErrorHandler")
+                            .from("{{fromUrl}}")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
                             .to("kamelet:sink");
 
                     //creating sink template
-                    RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+                    routeTemplate("ckcSink")
                             .templateParameter("toUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            .templateParameter("marshal", "dummyDataformat")
-                            .templateParameter("unmarshal", "dummyDataformat")
-
-                            //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
-                            .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
-                            .templateParameter("aggregationSize", "1")
-                            .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
-
-                            .templateParameter("idempotentExpression", "dummyExpression")
-                            .templateParameter("idempotentRepository", "ckcIdempotentRepository")
-                            .templateParameter("headersExcludePattern", "(?!)");
-
+                            .from("kamelet:source")
+                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
+                            .to("{{toUrl}}");
 
-                    ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
-                            .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+                    //creating the actual route
+                    ProcessorDefinition<?> rd = from(from);
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
+                        rd = rd.kamelet("ckcMarshal");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
+                        rd = rd.kamelet("ckcUnMarshal");
                     }
-
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                        AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
-                        rdInTemplateSink = rdInTemplateSink.aggregate(s)
-                                .constant(true)
-                                .completionSize("{{aggregationSize}}")
-                                .completionTimeout("{{aggregationTimeout}}");
+                        rd = rd.kamelet("ckcAggregator");
                     }
-
                     if (idempotencyEnabled) {
-                        rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+                        rd = rd.kamelet("ckcIdempotent");
                     }
-
-                    rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
-                            .to("{{toUrl}}");
-
-                    //creating the actual route
-                    from(from).toD(to);
+                    rd = rd.kamelet("ckcRemoveHeader");
+                    rd.toD(to);
                 }
             });
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index c3d26a4..36a886c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
@@ -26,7 +24,6 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -69,11 +66,7 @@ public class DataFormatTest {
         props.put("camel.sink.marshal", "missingDataformat");
 
         CamelSinkTask camelsinkTask = new CamelSinkTask();
-        camelsinkTask.start(props);
-        List<SinkRecord> records = new ArrayList<SinkRecord>();
-        SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
-        records.add(record);
-        assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
+        assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
         // No need to check the stop method. The error is already thrown/caught during startup.
         camelsinkTask.stop();
     }