You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/09 15:52:03 UTC

[camel-kafka-connector] branch camel-master updated (7f8ed58 -> 3648372)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 7f8ed58  Aligned with 3.6.0-SNAPSHOT
     new 7c8bd3f  core: switch to  SimpleMain as base class for CamelKafkaConnectMain
     new 3648372  core: use camel's built-in support for configuring data formats trough properties #497

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../utils/CamelKafkaConnectMain.java               | 90 ++--------------------
 .../camel/kafkaconnector/DataFormatTest.java       | 11 ++-
 2 files changed, 13 insertions(+), 88 deletions(-)


[camel-kafka-connector] 01/02: core: switch to SimpleMain as base class for CamelKafkaConnectMain

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7c8bd3fce34bd36e53242542a624f346573fe0dc
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 9 10:41:59 2020 +0200

    core: switch to  SimpleMain as base class for CamelKafkaConnectMain
---
 .../utils/CamelKafkaConnectMain.java               | 48 ++--------------------
 1 file changed, 4 insertions(+), 44 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 450360f..191feb0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -27,8 +27,7 @@ import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
-import org.apache.camel.main.BaseMainSupport;
-import org.apache.camel.main.MainListener;
+import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.support.PropertyBindingSupport;
@@ -37,7 +36,7 @@ import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CamelKafkaConnectMain extends BaseMainSupport {
+public class CamelKafkaConnectMain extends SimpleMain {
     public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
     private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
 
@@ -45,57 +44,18 @@ public class CamelKafkaConnectMain extends BaseMainSupport {
     protected volatile ProducerTemplate producerTemplate;
 
     public CamelKafkaConnectMain(CamelContext context) {
-        this.camelContext = context;
-    }
-
-    @Override
-    protected void doInit() throws Exception {
-        super.doInit();
-        postProcessCamelContext(camelContext);
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        LOG.info("Starting Main");
-
-        for (MainListener listener : listeners) {
-            listener.beforeStart(this);
-        }
-
-        super.doStart();
-
-        getCamelContext().start();
-
-        for (MainListener listener : listeners) {
-            listener.afterStart(this);
-        }
-
-        LOG.info("Main started");
+        super(context);
     }
 
     @Override
     protected void doStop() throws Exception {
-        LOG.info("Stopping Main");
-
         ServiceHelper.stopService(consumerTemplate);
         consumerTemplate = null;
 
         ServiceHelper.stopService(producerTemplate);
         producerTemplate = null;
 
-        for (MainListener listener : listeners) {
-            listener.beforeStop(this);
-        }
-
-        super.doStart();
-
-        getCamelContext().stop();
-
-        for (MainListener listener : listeners) {
-            listener.afterStop(this);
-        }
-
-        LOG.info("Main stopped");
+        super.doStop();
     }
 
     public ProducerTemplate getProducerTemplate() {


[camel-kafka-connector] 02/02: core: use camel's built-in support for configuring data formats trough properties #497

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3648372da406734a8cb3bc5aa8fd706fe233eb11
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 9 10:48:32 2020 +0200

    core: use camel's built-in support for configuring data formats trough properties #497
---
 .../utils/CamelKafkaConnectMain.java               | 42 +++-------------------
 .../camel/kafkaconnector/DataFormatTest.java       | 11 +++---
 2 files changed, 9 insertions(+), 44 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 191feb0..36ec56a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -22,15 +22,12 @@ import java.util.Properties;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.kafkaconnector.CamelConnectorConfig;
 import org.apache.camel.main.SimpleMain;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -144,14 +141,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
                     //dataformats
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        LOG.info(".marshal().custom({})", marshallDataFormat);
-                        getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat));
-                        rd.marshal().custom(marshallDataFormat);
+                        LOG.info(".marshal({})", marshallDataFormat);
+                        rd.marshal(marshallDataFormat);
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        LOG.info(".unmarshal().custom({})", unmarshallDataFormat);
-                        getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat));
-                        rd.unmarshal().custom(unmarshallDataFormat);
+                        LOG.info(".unmarshal({})", unmarshallDataFormat);
+                        rd.unmarshal(unmarshallDataFormat);
                     }
                     if (getContext().getRegistry().lookupByName("aggregate") != null) {
                         //aggregation
@@ -170,33 +165,4 @@ public class CamelKafkaConnectMain extends SimpleMain {
             return camelMain;
         }
     }
-
-    private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) {
-        DataFormat df = camelContext.resolveDataFormat(dataformatName);
-
-        if (df == null) {
-            df = camelContext.createDataFormat(dataformatName);
-
-            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
-            final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
-
-            CamelContextAware.trySetCamelContext(df, camelContext);
-
-            if (!props.isEmpty()) {
-                PropertyBindingSupport.build()
-                    .withCamelContext(camelContext)
-                    .withOptionPrefix(prefix)
-                    .withRemoveParameters(false)
-                    .withProperties((Map) props)
-                    .withTarget(df)
-                    .bind();
-            }
-        }
-
-        //TODO: move it to the caller?
-        if (df == null) {
-            throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
-        }
-        return df;
-    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 0ce5ab0..4e309d4 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -20,9 +20,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
+import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
-import org.apache.camel.model.dataformat.SyslogDataFormat;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
 
@@ -94,9 +94,9 @@ public class DataFormatTest {
         dcc.getRegistry().bind("syslog", syslogDf);
 
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertNotNull(hl7dfLoaded);
-        SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class);
+        SyslogDataFormat syslogDfLoaded = (SyslogDataFormat)dcc.resolveDataFormat("syslog");
         assertNotNull(syslogDfLoaded);
         cms.stop();
     }
@@ -119,7 +119,7 @@ public class DataFormatTest {
         dcc.getRegistry().bind("hl7", hl7df);
 
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertFalse(hl7dfLoaded.isValidate());
         cms.stop();
     }
@@ -139,9 +139,8 @@ public class DataFormatTest {
             .withMarshallDataFormat("hl7")
             .build(dcc);
 
-
         cms.start();
-        HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
+        HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7");
         assertTrue(hl7dfLoaded.isValidate());
         cms.stop();
     }