You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:45 UTC
[camel-kafka-connector] 02/05: Related to #423 resolved a problem
with marshal/unmarshal after fixin
https://issues.apache.org/jira/browse/CAMEL-16551 for sources
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c653fddf57c4d5ed96eaa09faf769ca6607304ee
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat May 15 07:59:12 2021 +0200
Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources
---
.../utils/CamelKafkaConnectMain.java | 29 +++++++++++-----------
.../camel/kafkaconnector/CamelSourceTaskTest.java | 5 ----
2 files changed, 14 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 6e7dbdf..036375b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -228,16 +228,15 @@ public class CamelKafkaConnectMain extends SimpleMain {
Properties camelProperties = new Properties();
camelProperties.putAll(props);
- //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-// //dataformats
-// if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-// }
-// if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-// }
+ //dataformats
+ if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+ }
+ if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+ }
//aggregator
if (!ObjectHelper.isEmpty(aggregationSize)) {
@@ -310,9 +309,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
.templateParameter("fromUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
- //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-// .templateParameter("marshall", "dummyDataformat")
-// .templateParameter("unmarshall", "dummyDataformat")
+
+ .templateParameter("marshall", "dummyDataformat")
+ .templateParameter("unmarshall", "dummyDataformat")
//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -327,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+ rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
+ rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b1271ac..36ae9e2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -235,9 +235,6 @@ public class CamelSourceTaskTest {
CamelSourceTask sourceTask = new CamelSourceTask();
sourceTask.start(props);
-// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
-
sourceTask.getCms().getCamelContext().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("timer"))
.forEach(e -> {
@@ -261,8 +258,6 @@ public class CamelSourceTaskTest {
CamelSourceTask sourceTask = new CamelSourceTask();
sourceTask.start(props);
-// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
sourceTask.getCms().getCamelContext().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("seda"))
.forEach(e -> {