You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/08/22 10:37:56 UTC
[camel-kafka-connector] 05/06: Related to #423 modularized kamelets
and composed them to better autogenerate connectors from kamelets catalog
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8bbd57027cc708e73a9d5b7f2007f66b9ee1f201
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Jul 26 11:34:35 2021 +0200
Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog
---
.../utils/CamelKafkaConnectMain.java | 172 ++++++++++-----------
.../camel/kafkaconnector/DataFormatTest.java | 9 +-
2 files changed, 80 insertions(+), 101 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 0871307..2e8d3a8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
@@ -31,11 +30,8 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.CamelSourceTask;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.ProcessorDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -46,6 +42,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelKafkaConnectMain extends SimpleMain {
+ public static final String KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcMarshal.";
+ public static final String KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcUnMarshal.";
+ public static final String KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcAggregator.";
+ public static final String KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcIdempotent.";
+ public static final String KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcRemoveHeader.";
+
private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
protected volatile ConsumerTemplate consumerTemplate;
@@ -228,36 +230,45 @@ public class CamelKafkaConnectMain extends SimpleMain {
Properties camelProperties = new Properties();
camelProperties.putAll(props);
+ //error handler
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+ if (errorHandler != null) {
+ switch (errorHandler) {
+ case "no":
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+ break;
+ case "default":
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+ break;
+ default:
+ break;
+ }
+ }
+
//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+ camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+ camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
}
//aggregator
if (!ObjectHelper.isEmpty(aggregationSize)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+ camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
}
if (!ObjectHelper.isEmpty(aggregationTimeout)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+ camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
}
//idempotency
if (idempotencyEnabled) {
switch (expressionType) {
case "body":
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+ camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
break;
case "header":
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+ camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
break;
default:
break;
@@ -279,117 +290,92 @@ public class CamelKafkaConnectMain extends SimpleMain {
//remove headers
if (!ObjectHelper.isEmpty(headersExcludePattern)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+ camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
}
+ // log filtered properties and set initial camel properties
List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
camelMain.setInitialProperties(camelProperties);
- //error handler
- camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
- if (errorHandler != null) {
- switch (errorHandler) {
- case "no":
- camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
- break;
- case "default":
- camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
- break;
- default:
- break;
- }
- }
-
camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {
- //creating source template
- RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
- .templateParameter("fromUrl")
- .templateParameter("errorHandler", "ckcErrorHandler")
-
+ //create marshal template
+ routeTemplate("ckcMarshal")
.templateParameter("marshal", "dummyDataformat")
+ .from("kamelet:source")
+ .marshal("{{marshal}}")
+ .to("kamelet:sink");
+
+ //create unmarshal template
+ routeTemplate("ckcUnMarshal")
.templateParameter("unmarshal", "dummyDataformat")
+ .from("kamelet:source")
+ .marshal("{{unmarshal}}")
+ .to("kamelet:sink");
- //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+ //create aggregator template
+ routeTemplate("ckcAggregator")
+ //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
.templateParameter("aggregationSize", "1")
.templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+ .from("kamelet:source")
+ .aggregate(constant(true))
+ .aggregationStrategyRef("{{aggregationStrategy}}")
+ .completionSize("{{aggregationSize}}")
+ .completionTimeout("{{aggregationTimeout}}")
+ .to("kamelet:sink")
+ .end();
+ //create idempotent template
+ routeTemplate("ckcIdempotent")
.templateParameter("idempotentExpression", "dummyExpression")
.templateParameter("idempotentRepository", "ckcIdempotentRepository")
- .templateParameter("headersExcludePattern", "(?!)");
-
-
- ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
- .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
- if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
- }
- if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
- }
-
- if (getContext().getRegistry().lookupByName("aggregate") != null) {
- AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
- rdInTemplateSource = rdInTemplateSource.aggregate(s)
- .constant(true)
- .completionSize("{{aggregationSize}}")
- .completionTimeout("{{aggregationTimeout}}");
- }
+ .from("kamelet:source")
+ .idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}")
+ .to("kamelet:sink");
- if (idempotencyEnabled) {
- rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
- }
+ //create removeHeader template
+ routeTemplate("ckcRemoveHeader")
+ .templateParameter("headersExcludePattern", "(?!)")
+ .from("kamelet:source")
+ .removeHeaders("{{headersExcludePattern}}")
+ .to("kamelet:sink");
- rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+ //creating source template
+ routeTemplate("ckcSource")
+ .templateParameter("fromUrl")
+ .templateParameter("errorHandler", "ckcErrorHandler")
+ .from("{{fromUrl}}")
+ .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
.to("kamelet:sink");
//creating sink template
- RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+ routeTemplate("ckcSink")
.templateParameter("toUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
- .templateParameter("marshal", "dummyDataformat")
- .templateParameter("unmarshal", "dummyDataformat")
-
- //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
- .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
- .templateParameter("aggregationSize", "1")
- .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
-
- .templateParameter("idempotentExpression", "dummyExpression")
- .templateParameter("idempotentRepository", "ckcIdempotentRepository")
- .templateParameter("headersExcludePattern", "(?!)");
-
+ .from("kamelet:source")
+ .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"))
+ .to("{{toUrl}}");
- ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
- .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+ //creating the actual route
+ ProcessorDefinition<?> rd = from(from);
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
+ rd = rd.kamelet("ckcMarshal");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
+ rd = rd.kamelet("ckcUnMarshal");
}
-
if (getContext().getRegistry().lookupByName("aggregate") != null) {
- AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
- rdInTemplateSink = rdInTemplateSink.aggregate(s)
- .constant(true)
- .completionSize("{{aggregationSize}}")
- .completionTimeout("{{aggregationTimeout}}");
+ rd = rd.kamelet("ckcAggregator");
}
-
if (idempotencyEnabled) {
- rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+ rd = rd.kamelet("ckcIdempotent");
}
-
- rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
- .to("{{toUrl}}");
-
- //creating the actual route
- from(from).toD(to);
+ rd = rd.kamelet("ckcRemoveHeader");
+ rd.toD(to);
}
});
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index c3d26a4..36a886c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,9 +16,7 @@
*/
package org.apache.camel.kafkaconnector;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.camel.component.hl7.HL7DataFormat;
@@ -26,7 +24,6 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -69,11 +66,7 @@ public class DataFormatTest {
props.put("camel.sink.marshal", "missingDataformat");
CamelSinkTask camelsinkTask = new CamelSinkTask();
- camelsinkTask.start(props);
- List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
- records.add(record);
- assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
+ assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
// No need to check the stop method. The error is already thrown/caught during startup.
camelsinkTask.stop();
}