You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/07/27 13:49:45 UTC

[camel-kafka-connector] 02/05: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c653fddf57c4d5ed96eaa09faf769ca6607304ee
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sat May 15 07:59:12 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources
---
 .../utils/CamelKafkaConnectMain.java               | 29 +++++++++++-----------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  5 ----
 2 files changed, 14 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 6e7dbdf..036375b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -228,16 +228,15 @@ public class CamelKafkaConnectMain extends SimpleMain {
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//            //dataformats
-//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
-//            }
-//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-//                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-//                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
-//            }
+            //dataformats
+            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat);
+            }
+            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+                camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat);
+            }
 
             //aggregator
             if (!ObjectHelper.isEmpty(aggregationSize)) {
@@ -310,9 +309,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSource = routeTemplate("ckcSource")
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", "ckcErrorHandler")
-                            //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", "dummyDataformat")
+
+                            .templateParameter("marshall", "dummyDataformat")
+                            .templateParameter("unmarshall", "dummyDataformat")
 
                             //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -327,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}")
                             .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat);
+                        rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b1271ac..36ae9e2 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -235,9 +235,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
@@ -261,8 +258,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size());
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {