You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:44 UTC
[camel-kafka-connector] 01/05: Related to #423 : caonverted source
and sink to use camel-kamelets.
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8324995d00da2f9687e4d25733e2f5233d574a8d
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Apr 26 01:38:42 2021 +0200
Related to #423 : caonverted source and sink to use camel-kamelets.
---
core/pom.xml | 10 +-
.../apache/camel/kafkaconnector/CamelSinkTask.java | 5 +-
.../camel/kafkaconnector/CamelSourceTask.java | 5 +-
.../utils/CamelKafkaConnectMain.java | 272 ++++++++++++---------
.../camel/kafkaconnector/CamelSourceTaskTest.java | 10 +-
.../camel/kafkaconnector/DataFormatTest.java | 10 +-
6 files changed, 191 insertions(+), 121 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index f6c014f..3138163 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -53,12 +53,20 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-kafka</artifactId>
+ <artifactId>camel-kamelet</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-languages</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-xml-jaxb</artifactId>
+ </dependency>
<!-- Tools -->
<dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..4e5a201 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelSinkTask extends SinkTask {
+ public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink.";
+
public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key";
public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -119,8 +121,9 @@ public class CamelSinkTask extends SinkTask {
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
}
+ actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
- cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+ cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 00ce145..77ce636 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
public class CamelSourceTask extends SourceTask {
+ public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource.";
public static final String HEADER_CAMEL_PREFIX = "CamelHeader.";
public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty.";
@@ -145,8 +146,9 @@ public class CamelSourceTask extends SourceTask {
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
+ actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);
- cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+ cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
@@ -171,6 +173,7 @@ public class CamelSourceTask extends SourceTask {
consumer.start();
cms.start();
+
LOG.info("CamelSourceTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index d031b20..6e7dbdf 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -26,10 +26,16 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.DefaultErrorHandlerBuilder;
+import org.apache.camel.builder.ErrorHandlerBuilderRef;
+import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.CamelSourceTask;
import org.apache.camel.main.SimpleMain;
-import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
@@ -40,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelKafkaConnectMain extends SimpleMain {
- public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
protected volatile ConsumerTemplate consumerTemplate;
@@ -140,67 +145,67 @@ public class CamelKafkaConnectMain extends SimpleMain {
this.aggregationTimeout = aggregationTimeout;
return this;
}
-
+
public Builder withErrorHandler(String errorHandler) {
this.errorHandler = errorHandler;
return this;
}
-
+
public Builder withMaxRedeliveries(int maxRedeliveries) {
this.maxRedeliveries = maxRedeliveries;
return this;
}
-
+
public Builder withRedeliveryDelay(long redeliveryDelay) {
this.redeliveryDelay = redeliveryDelay;
return this;
}
-
+
public Builder withIdempotencyEnabled(boolean idempotencyEnabled) {
this.idempotencyEnabled = idempotencyEnabled;
return this;
}
-
+
public Builder withExpressionType(String expressionType) {
this.expressionType = expressionType;
return this;
}
-
+
public Builder withExpressionHeader(String expressionHeader) {
this.expressionHeader = expressionHeader;
return this;
}
-
+
public Builder withMemoryDimension(int memoryDimension) {
this.memoryDimension = memoryDimension;
return this;
}
-
+
public Builder withIdempotentRepositoryType(String idempotentRepositoryType) {
this.idempotentRepositoryType = idempotentRepositoryType;
return this;
}
-
+
public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) {
this.idempotentRepositoryTopicName = idempotentRepositoryTopicName;
return this;
}
-
+
public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) {
this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers;
return this;
}
-
+
public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) {
this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize;
return this;
}
-
+
public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) {
this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration;
return this;
}
-
+
public Builder withHeadersExcludePattern(String headersExcludePattern) {
this.headersExcludePattern = headersExcludePattern;
return this;
@@ -214,21 +219,51 @@ public class CamelKafkaConnectMain extends SimpleMain {
return entry.getKey() + "=" + entry.getValue();
}
-
public CamelKafkaConnectMain build(CamelContext camelContext) {
CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
camelMain.configure().setAutoConfigurationLogSummary(false);
+ //TODO: make it configurable
+ camelMain.configure().setDumpRoutes(true);
Properties camelProperties = new Properties();
camelProperties.putAll(props);
- List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+ //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+// //dataformats
+// if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+// }
+// if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+// }
- LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
- camelMain.setInitialProperties(camelProperties);
-
- // Instantianting the idempotent Repository here and inject it in registry to be referenced
+ //aggregator
+ if (!ObjectHelper.isEmpty(aggregationSize)) {
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize));
+ }
+ if (!ObjectHelper.isEmpty(aggregationTimeout)) {
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout));
+ }
+
+ //idempotency
if (idempotencyEnabled) {
+ switch (expressionType) {
+ case "body":
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}");
+ break;
+ case "header":
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}");
+ break;
+ default:
+ break;
+ }
+ // Instantiating the idempotent Repository here and inject it in registry to be referenced
IdempotentRepository idempotentRepo = null;
switch (idempotentRepositoryType) {
case "memory":
@@ -240,110 +275,123 @@ public class CamelKafkaConnectMain extends SimpleMain {
default:
break;
}
- camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo);
+ camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo);
+ }
+
+ //remove headers
+ if (!ObjectHelper.isEmpty(headersExcludePattern)) {
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern);
+ }
+
+ List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList());
+ LOG.info("Setting initial properties in Camel context: [{}]", filteredProps);
+ camelMain.setInitialProperties(camelProperties);
+
+ //error handler
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder());
+ if (errorHandler != null) {
+ switch (errorHandler) {
+ case "no":
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder());
+ break;
+ case "default":
+ camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+ break;
+ default:
+ break;
+ }
}
- //creating the actual route
camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {
- //from
- RouteDefinition rd = from(from);
- LOG.info("Creating Camel route from({})", from);
-
- if (!ObjectHelper.isEmpty(errorHandler)) {
- switch (errorHandler) {
- case "no":
- rd.errorHandler(noErrorHandler());
- break;
- case "default":
- rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
- break;
- default:
- break;
- }
+
+ //creating source template
+ RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
+ .templateParameter("fromUrl")
+ .templateParameter("errorHandler", "ckcErrorHandler")
+ //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+// .templateParameter("marshall", "dummyDataformat")
+// .templateParameter("unmarshall", "dummyDataformat")
+
+ //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+ .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+ .templateParameter("aggregationSize", "1")
+ .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+ .templateParameter("idempotentExpression", "dummyExpression")
+ .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+ .templateParameter("headersExcludePattern", "(?!)");
+
+
+ ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
+ .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
+ if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+ rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+ }
+ if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+ rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
}
- //dataformats
+ if (getContext().getRegistry().lookupByName("aggregate") != null) {
+ AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
+ rdInTemplateSource = rdInTemplateSource.aggregate(s)
+ .constant(true)
+ .completionSize("{{aggregationSize}}")
+ .completionTimeout("{{aggregationTimeout}}");
+ }
+
+ if (idempotencyEnabled) {
+ rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+ }
+
+ rdInTemplateSource.removeHeaders("{{headersExcludePattern}}")
+ .to("kamelet:sink");
+
+ //creating sink template
+ RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
+ .templateParameter("toUrl")
+ .templateParameter("errorHandler", "ckcErrorHandler")
+ //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
+// .templateParameter("marshall", "dummyDataformat")
+// .templateParameter("unmarshall", "dummyDataformat")
+
+ //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
+ .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
+ .templateParameter("aggregationSize", "1")
+ .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE))
+
+ .templateParameter("idempotentExpression", "dummyExpression")
+ .templateParameter("idempotentRepository", "ckcIdempotentRepository")
+ .templateParameter("headersExcludePattern", "(?!)");
+
+
+ ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
+ .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- LOG.info(".marshal({})", marshallDataFormat);
- rd.marshal(marshallDataFormat);
+ rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- LOG.info(".unmarshal({})", unmarshallDataFormat);
- rd.unmarshal(unmarshallDataFormat);
+ rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
}
+
if (getContext().getRegistry().lookupByName("aggregate") != null) {
- //aggregation
AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class);
- if (idempotencyEnabled) {
- switch (expressionType) {
- case "body":
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + "
- + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
- LOG.info(".to({})", to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
- } else {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
- .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
- }
- break;
- case "header":
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + "
- + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension);
- LOG.info(".to({})", to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
- .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
- } else {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout)
- .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
- }
- break;
- default:
- break;
- }
- } else {
- LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
- LOG.info(".to({})", to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
- } else {
- rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to);
- }
- }
- } else {
- if (idempotencyEnabled) {
- switch (expressionType) {
- case "body":
- LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to);
- } else {
- rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
- }
- break;
- case "header":
- LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to);
- } else {
- rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to);
- }
- break;
- default:
- break;
- }
- } else {
- //to
- LOG.info(".to({})", to);
- if (ObjectHelper.isEmpty(headersExcludePattern)) {
- rd.toD(to);
- } else {
- rd.removeHeaders(headersExcludePattern).toD(to);
- }
- }
+ rdInTemplateSink = rdInTemplateSink.aggregate(s)
+ .constant(true)
+ .completionSize("{{aggregationSize}}")
+ .completionTimeout("{{aggregationTimeout}}");
}
+
+ if (idempotencyEnabled) {
+ rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}");
+ }
+
+ rdInTemplateSink.removeHeaders("{{headersExcludePattern}}")
+ .to("{{toUrl}}");
+
+ //creating the actual route
+ from(from).toD(to);
}
});
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 5c99ad0..b1271ac 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSourceTaskTest {
-
private static final String DIRECT_URI = "direct:start";
private static final String TOPIC_NAME = "my-topic";
@@ -225,7 +224,7 @@ public class CamelSourceTaskTest {
}
@Test
- public void testUrlPrecedenceOnComponentProperty() {
+ public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2");
@@ -236,7 +235,8 @@ public class CamelSourceTaskTest {
CamelSourceTask sourceTask = new CamelSourceTask();
sourceTask.start(props);
- assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
+
sourceTask.getCms().getCamelContext().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("timer"))
@@ -261,10 +261,10 @@ public class CamelSourceTaskTest {
CamelSourceTask sourceTask = new CamelSourceTask();
sourceTask.start(props);
- assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
+// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
sourceTask.getCms().getCamelContext().getEndpoints().stream()
- .filter(e -> e.getEndpointUri().startsWith("direct"))
+ .filter(e -> e.getEndpointUri().startsWith("seda"))
.forEach(e -> {
assertTrue(e.getEndpointUri().contains("end"));
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 6715843..c3d26a4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.kafkaconnector;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.camel.component.hl7.HL7DataFormat;
@@ -24,6 +26,7 @@ import org.apache.camel.component.syslog.SyslogDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -66,8 +69,13 @@ public class DataFormatTest {
props.put("camel.sink.marshal", "missingDataformat");
CamelSinkTask camelsinkTask = new CamelSinkTask();
- assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
+ camelsinkTask.start(props);
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42);
+ records.add(record);
+ assertThrows(ConnectException.class, () -> camelsinkTask.put(records));
// No need to check the stop method. The error is already thrown/caught during startup.
+ camelsinkTask.stop();
}
@Test