You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:46 UTC
[camel-kafka-connector] 03/05: Related to #423 resolved a problem
with marshal/unmarshal after fixin
https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 7055189263b52f1c818ec2443ede0be9837930c3
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed May 19 10:46:13 2021 +0200
Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
---
.../utils/CamelKafkaConnectMain.java | 25 +++++++++++-----------
1 file changed, 12 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 036375b..0871307 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
- camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+ camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+ camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
}
//aggregator
@@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
.templateParameter("fromUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
- .templateParameter("marshall", "dummyDataformat")
- .templateParameter("unmarshall", "dummyDataformat")
+ .templateParameter("marshal", "dummyDataformat")
+ .templateParameter("unmarshal", "dummyDataformat")
//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
+ rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
+ rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {
@@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
.templateParameter("toUrl")
.templateParameter("errorHandler", "ckcErrorHandler")
- //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-// .templateParameter("marshall", "dummyDataformat")
-// .templateParameter("unmarshall", "dummyDataformat")
+ .templateParameter("marshal", "dummyDataformat")
+ .templateParameter("unmarshal", "dummyDataformat")
//TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
.templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -368,10 +367,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
.errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
+ rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
+ rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {