You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/19 08:32:46 UTC
[camel-kafka-connector] 06/12: core: use camel's built-in support
for configuring data formats trough properties #497
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit dd88f744c51e27a60626987913ce2e540ba3c142
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 9 10:48:32 2020 +0200
core: use camel's built-in support for configuring data formats trough properties #497
---
.../utils/CamelKafkaConnectMain.java | 42 +++-------------------
.../camel/kafkaconnector/DataFormatTest.java | 11 +++---
2 files changed, 9 insertions(+), 44 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 191feb0..36ec56a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,15 +22,12 @@ import java.util.Properties;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.kafkaconnector.CamelConnectorConfig;
import org.apache.camel.main.SimpleMain;
import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -144,14 +141,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
- LOG.info(".marshal().custom({})", marshallDataFormat);
- getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat));
- rd.marshal().custom(marshallDataFormat);
+ LOG.info(".marshal({})", marshallDataFormat);
+ rd.marshal(marshallDataFormat);
}
if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
- LOG.info(".unmarshal().custom({})", unmarshallDataFormat);
- getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat));
- rd.unmarshal().custom(unmarshallDataFormat);
+ LOG.info(".unmarshal({})", unmarshallDataFormat);
+ rd.unmarshal(unmarshallDataFormat);
}
if (getContext().getRegistry().lookupByName("aggregate") != null) {
//aggregation
@@ -170,33 +165,4 @@ public class CamelKafkaConnectMain extends SimpleMain {
return camelMain;
}
}
-
- private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) {
- DataFormat df = camelContext.resolveDataFormat(dataformatName);
-
- if (df == null) {
- df = camelContext.createDataFormat(dataformatName);
-
- final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
- final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
-
- CamelContextAware.trySetCamelContext(df, camelContext);
-
- if (!props.isEmpty()) {
- PropertyBindingSupport.build()
- .withCamelContext(camelContext)
- .withOptionPrefix(prefix)
- .withRemoveParameters(false)
- .withProperties((Map) props)
- .withTarget(df)
- .bind();
- }
- }
-
- //TODO: move it to the caller?
- if (df == null) {
- throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
- }
- return df;
- }
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 0ce5ab0..4e309d4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -20,9 +20,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.camel.component.hl7.HL7DataFormat;
+import org.apache.camel.component.syslog.SyslogDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
-import org.apache.camel.model.dataformat.SyslogDataFormat;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;
@@ -94,9 +94,9 @@ public class DataFormatTest {
dcc.getRegistry().bind("syslog", syslogDf);
cms.start();
- HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+ HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
assertNotNull(hl7dfLoaded);
- SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class);
+ SyslogDataFormat syslogDfLoaded = (SyslogDataFormat)dcc.resolveDataFormat("syslog");
assertNotNull(syslogDfLoaded);
cms.stop();
}
@@ -119,7 +119,7 @@ public class DataFormatTest {
dcc.getRegistry().bind("hl7", hl7df);
cms.start();
- HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+ HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
assertFalse(hl7dfLoaded.isValidate());
cms.stop();
}
@@ -139,9 +139,8 @@ public class DataFormatTest {
.withMarshallDataFormat("hl7")
.build(dcc);
-
cms.start();
- HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+ HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
assertTrue(hl7dfLoaded.isValidate());
cms.stop();
}