You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/13 05:16:58 UTC

[camel-kafka-connector] 06/07: core: use camel's built-in support for configuring data formats trough properties #497

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 95ba3e1aa57391f1ce6e37b248498cb95e6c9d31
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 9 10:48:32 2020 +0200

    core: use camel's built-in support for configuring data formats trough properties #497
---
 .../utils/CamelKafkaConnectMain.java               | 42 +++-------------------
 .../camel/kafkaconnector/DataFormatTest.java       | 11 +++---
 2 files changed, 9 insertions(+), 44 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 191feb0..36ec56a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,15 +22,12 @@ import java.util.Properties;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -144,14 +141,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
                     //dataformats
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        LOG.info(".marshal().custom({})", marshallDataFormat);
-                        getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat));
-                        rd.marshal().custom(marshallDataFormat);
+                        LOG.info(".marshal({})", marshallDataFormat);
+                        rd.marshal(marshallDataFormat);
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        LOG.info(".unmarshal().custom({})", unmarshallDataFormat);
-                        getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat));
-                        rd.unmarshal().custom(unmarshallDataFormat);
+                        LOG.info(".unmarshal({})", unmarshallDataFormat);
+                        rd.unmarshal(unmarshallDataFormat);
                     }
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
                         //aggregation
@@ -170,33 +165,4 @@ public class CamelKafkaConnectMain extends SimpleMain {
             return camelMain;
         }
     }
-
-    private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) {
-        DataFormat df = camelContext.resolveDataFormat(dataformatName);
-
-        if (df == null) {
-            df = camelContext.createDataFormat(dataformatName);
-
-            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
-            final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
-
-            CamelContextAware.trySetCamelContext(df, camelContext);
-
-            if (!props.isEmpty()) {
-                PropertyBindingSupport.build()
-                    .withCamelContext(camelContext)
-                    .withOptionPrefix(prefix)
-                    .withRemoveParameters(false)
-                    .withProperties((Map) props)
-                    .withTarget(df)
-                    .bind();
-            }
-        }
-
-        //TODO: move it to the caller?
-        if (df == null) {
-            throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
-        }
-        return df;
-    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 0ce5ab0..4e309d4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -20,9 +20,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
+import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
-import org.apache.camel.model.dataformat.SyslogDataFormat;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
 
@@ -94,9 +94,9 @@ public class DataFormatTest {
         dcc.getRegistry().bind("syslog", syslogDf);
 
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertNotNull(hl7dfLoaded);
-        SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class);
+        SyslogDataFormat syslogDfLoaded = (SyslogDataFormat)dcc.resolveDataFormat("syslog");
         assertNotNull(syslogDfLoaded);
         cms.stop();
     }
@@ -119,7 +119,7 @@ public class DataFormatTest {
         dcc.getRegistry().bind("hl7", hl7df);
 
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertFalse(hl7dfLoaded.isValidate());
         cms.stop();
     }
@@ -139,9 +139,8 @@ public class DataFormatTest {
             .withMarshallDataFormat("hl7")
             .build(dcc);
 
-
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertTrue(hl7dfLoaded.isValidate());
         cms.stop();
     }