You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:46 UTC

[camel-kafka-connector] 03/05: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7055189263b52f1c818ec2443ede0be9837930c3
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Wed May 19 10:46:13 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
---
 .../utils/CamelKafkaConnectMain.java               | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 036375b..0871307 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             //dataformats
             if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat);
             }
             if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat);
             }
 
             //aggregator
@@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
 
-                            .templateParameter("marshall", "dummyDataformat")
-                            .templateParameter("unmarshall", "dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
+                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
+                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
@@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
                             .templateParameter("toUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", "dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -368,10 +367,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {