You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/07 21:01:50 UTC

[camel-kafka-connector] branch master updated (7f46db0 -> 8dde130)

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 7f46db0  Catalog: Rename the json file to reflect the id and connectors.properties content - regeneration
     new e5184d8  [core] refactor camel main
     new e6049ee  [core] cleanup CamelSourceTask
     new be4ad31  [core] cleanup CamelSinkTask
     new 8dde130  [core] cleanup TaskHelper

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  35 ++---
 .../camel/kafkaconnector/CamelSourceTask.java      |  43 +++---
 .../utils/CamelKafkaConnectMain.java               | 129 ++++++++++++++++
 .../kafkaconnector/utils/CamelMainSupport.java     | 165 ---------------------
 .../camel/kafkaconnector/utils/TaskHelper.java     |   6 +-
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  38 ++---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  18 +--
 .../camel/kafkaconnector/DataFormatTest.java       |  31 ++--
 .../kafkaconnector/PropertiesNameFormatsTest.java  |   4 +-
 9 files changed, 211 insertions(+), 258 deletions(-)
 delete mode 100644 core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java


[camel-kafka-connector] 03/04: [core] cleanup CamelSinkTask

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit be4ad310200b1a5ec775502277988e94a84e499d
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 17:48:22 2020 +0200

    [core] cleanup CamelSinkTask
---
 .../java/org/apache/camel/kafkaconnector/CamelSinkTask.java    | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index c39a4f4..3ac39e6 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,7 +19,6 @@ package org.apache.camel.kafkaconnector;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -61,7 +60,7 @@ public class CamelSinkTask extends SinkTask {
 
     @Override
     public String version() {
-        return new CamelSinkConnector().version();
+        return VersionUtil.getVersion();
     }
 
     @Override
@@ -108,7 +107,7 @@ public class CamelSinkTask extends SinkTask {
     }
 
     protected Map<String, String> getDefaultConfig() {
-        return Collections.EMPTY_MAP;
+        return Collections.emptyMap();
     }
 
     protected static String getCamelSinkEndpointConfigPrefix() {
@@ -123,11 +122,10 @@ public class CamelSinkTask extends SinkTask {
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
             TaskHelper.logRecordContent(LOG, record, config);
-            Map<String, Object> headers = new HashMap<String, Object>();
+            Map<String, Object> headers = new HashMap<>();
             Exchange exchange = new DefaultExchange(producer.getCamelContext());
             headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
-            for (Iterator<Header> iterator = record.headers().iterator(); iterator.hasNext();) {
-                Header header = (Header)iterator.next();
+            for (Header header : record.headers()) {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
                     addHeader(headers, header);
                 } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {


[camel-kafka-connector] 01/04: [core] refactor camel main

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e5184d8f610bda5a3db886d03e93d239c172587f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 15:57:57 2020 +0200

    [core] refactor camel main
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  25 ++--
 .../camel/kafkaconnector/CamelSourceTask.java      |  28 ++--
 .../utils/CamelKafkaConnectMain.java               | 129 ++++++++++++++++
 .../kafkaconnector/utils/CamelMainSupport.java     | 165 ---------------------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  38 ++---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  18 +--
 .../camel/kafkaconnector/DataFormatTest.java       |  31 ++--
 .../kafkaconnector/PropertiesNameFormatsTest.java  |   4 +-
 8 files changed, 199 insertions(+), 239 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index c6242cb..c39a4f4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -29,8 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.commons.lang3.StringUtils;
@@ -57,7 +55,7 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
 
 
-    private CamelMainSupport cms;
+    private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private CamelSinkConnectorConfig config;
 
@@ -76,13 +74,6 @@ public class CamelSinkTask extends SinkTask {
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
             final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
-            List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-            if (unmarshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
-            }
-            if (marshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-            }
             final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
             final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
 
@@ -95,9 +86,15 @@ public class CamelSinkTask extends SinkTask {
                                                 CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, dataformats, size, timeout, camelContext);
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+                .withProperties(actualProps)
+                .withUnmarshallDataFormat(unmarshaller)
+                .withMarshallDataFormat(marshaller)
+                .withAggregationSize(size)
+                .withAggregationTimeout(timeout)
+                .build(camelContext);
 
-            producer = cms.createProducerTemplate();
+            producer = cms.getProducerTemplate();
 
             cms.start();
             LOG.info("CamelSinkTask connector task started");
@@ -229,7 +226,7 @@ public class CamelSinkTask extends SinkTask {
         }
     }
 
-    public CamelMainSupport getCms() {
+    CamelKafkaConnectMain getCms() {
         return cms;
     }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e825e49..42d9214 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -25,18 +25,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.kafka.connect.data.Decimal;
@@ -58,7 +55,7 @@ public class CamelSourceTask extends SourceTask {
 
     private static final String LOCAL_URL = "direct:end";
 
-    private CamelMainSupport cms;
+    private CamelKafkaConnectMain cms;
     private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
     private String topic;
@@ -87,13 +84,7 @@ public class CamelSourceTask extends SourceTask {
             String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
-            List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-            if (unmarshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
-            }
-            if (marshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-            }
+
             topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
             topics = Arrays.asList(topic.split(","));
 
@@ -106,10 +97,15 @@ public class CamelSourceTask extends SourceTask {
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, dataformats, 10, 500, camelContext);
+            cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+                .withProperties(actualProps)
+                .withUnmarshallDataFormat(unmarshaller)
+                .withMarshallDataFormat(marshaller)
+                .withAggregationSize(10)
+                .withAggregationTimeout(500)
+                .build(camelContext);
 
-            Endpoint endpoint = cms.getEndpoint(localUrl);
-            consumer = endpoint.createPollingConsumer();
+            consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
             consumer.start();
 
             cms.start();
@@ -268,7 +264,7 @@ public class CamelSourceTask extends SourceTask {
                + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
     }
 
-    public CamelMainSupport getCms() {
+    CamelKafkaConnectMain getCms() {
         return cms;
     }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 46b0822..4150dea 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -16,12 +16,23 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,4 +130,122 @@ public class CamelKafkaConnectMain extends BaseMainSupport {
 
         return this.consumerTemplate;
     }
+
+    public static Builder builder(String from, String to) {
+        return new Builder(from, to);
+    }
+
+    public static final class Builder {
+        private final String from;
+        private final String to;
+        private Map<String, String> props;
+        private String marshallDataFormat;
+        private String unmarshallDataFormat;
+        private int aggregationSize;
+        private long aggregationTimeout;
+
+        public Builder(String from, String to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        public Builder withProperties(Map<String, String> props) {
+            this.props = new HashMap<>(props);
+            return this;
+        }
+
+        public Builder withMarshallDataFormat(String dataformatId) {
+            this.marshallDataFormat = dataformatId;
+            return this;
+        }
+
+        public Builder withUnmarshallDataFormat(String dataformatId) {
+            this.unmarshallDataFormat = dataformatId;
+            return this;
+        }
+
+        public Builder withAggregationSize(int aggregationSize) {
+            this.aggregationSize = aggregationSize;
+            return this;
+        }
+
+        public Builder withAggregationTimeout(long aggregationTimeout) {
+            this.aggregationTimeout = aggregationTimeout;
+            return this;
+        }
+
+        public CamelKafkaConnectMain build(CamelContext camelContext) {
+            CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
+            camelMain.configure().setAutoConfigurationLogSummary(false);
+
+            Properties camelProperties = new Properties();
+            camelProperties.putAll(props);
+
+            LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
+            camelMain.setInitialProperties(camelProperties);
+
+            //creating the actual route
+            camelMain.configure().addRoutesBuilder(new RouteBuilder() {
+                public void configure() {
+                    //from
+                    RouteDefinition rd = from(from);
+                    LOG.info("Creating Camel route from({})", from);
+
+                    //dataformats
+                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                        LOG.info(".marshal().custom({})", marshallDataFormat);
+                        getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat));
+                        rd.marshal().custom(marshallDataFormat);
+                    }
+                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                        LOG.info(".unmarshal().custom({})", unmarshallDataFormat);
+                        getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat));
+                        rd.unmarshal().custom(unmarshallDataFormat);
+                    }
+                    if (getContext().getRegistry().lookupByName("aggregate") != null) {
+                        //aggregation
+                        AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
+                        LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
+                        LOG.info(".to({})", to);
+                        rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                    } else {
+                        //to
+                        LOG.info(".to({})", to);
+                        rd.toD(to);
+                    }
+                }
+            });
+
+            return camelMain;
+        }
+    }
+
+    private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) {
+        DataFormat df = camelContext.resolveDataFormat(dataformatName);
+
+        if (df == null) {
+            df = camelContext.createDataFormat(dataformatName);
+
+            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
+            final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
+
+            CamelContextAware.trySetCamelContext(df, camelContext);
+
+            if (!props.isEmpty()) {
+                PropertyBindingSupport.build()
+                    .withCamelContext(camelContext)
+                    .withOptionPrefix(prefix)
+                    .withRemoveParameters(false)
+                    .withProperties((Map) props)
+                    .withTarget(df)
+                    .bind();
+            }
+        }
+
+        //TODO: move it to the caller?
+        if (df == null) {
+            throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
+        }
+        return df;
+    }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
deleted file mode 100644
index 6c17ecc..0000000
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.utils;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.camel.AggregationStrategy;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Endpoint;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.support.PropertyBindingSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CamelMainSupport {
-    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
-    private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
-
-    private final CamelKafkaConnectMain camelMain;
-
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) {
-        camelMain = new CamelKafkaConnectMain(camelContext);
-        camelMain.configure().setAutoConfigurationLogSummary(false);
-
-        Properties camelProperties = new Properties();
-        camelProperties.putAll(props);
-
-        LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
-        camelMain.setInitialProperties(camelProperties);
-
-        //creating the actual route
-        camelMain.configure().addRoutesBuilder(new RouteBuilder() {
-            public void configure() {
-                //from
-                RouteDefinition rd = from(fromUrl);
-
-                //dataformats
-                LOG.info("Creating Camel route from({})", fromUrl);
-                for (CamelKafkaConnectDataformat dataformat : dataformats) {
-                    String dataformatId = dataformat.getDataformatId();
-                    switch (dataformat.getDataformatKind()) {
-                        case MARSHALL:
-                            LOG.info(".marshal().custom({})", dataformatId);
-                            getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
-                            rd.marshal().custom(dataformatId);
-                            break;
-                        case UNMARSHALL:
-                            LOG.info(".unmarshal().custom({})", dataformatId);
-                            getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
-                            rd.unmarshal().custom(dataformatId);
-                            break;
-                        default:
-                            throw new UnsupportedOperationException("Unsupported dataformat: " + dataformat);
-                    }
-                }
-
-                if (getContext().getRegistry().lookupByName("aggregate") != null) {
-                    //aggregation
-                    AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
-                    LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
-                    LOG.info(".to({})", toUrl);
-                    rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
-                } else {
-                    //to
-                    LOG.info(".to({})", toUrl);
-                    rd.toD(toUrl);
-                }
-            }
-        });
-    }
-
-    public void start() {
-        LOG.info("Starting CamelContext");
-
-        try {
-            camelMain.start();
-        } catch (Exception e) {
-            LOG.info("CamelContext failed to start", e);
-            throw e;
-        }
-
-        LOG.info("CamelContext started");
-    }
-
-    public void stop() {
-        LOG.info("Stopping CamelContext");
-
-        try {
-            camelMain.stop();
-        } catch (Exception e) {
-            LOG.info("CamelContext failed to stop", e);
-            throw e;
-        }
-
-        LOG.info("CamelContext stopped");
-    }
-
-    public ProducerTemplate createProducerTemplate() {
-        return camelMain.getProducerTemplate();
-    }
-
-    public Endpoint getEndpoint(String uri) {
-        return camelMain.getCamelContext().getEndpoint(uri);
-    }
-
-    public Collection<Endpoint> getEndpoints() {
-        return camelMain.getCamelContext().getEndpoints();
-    }
-
-    public ConsumerTemplate createConsumerTemplate() {
-        return camelMain.getConsumerTemplate();
-    }
-
-    private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
-        DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName);
-
-        if (df == null) {
-            df = camelMain.getCamelContext().createDataFormat(dataformatName);
-
-            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
-            final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
-
-            CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext());
-
-            if (!props.isEmpty()) {
-                PropertyBindingSupport.build()
-                        .withCamelContext(camelMain.getCamelContext())
-                        .withOptionPrefix(prefix)
-                        .withRemoveParameters(false)
-                        .withProperties((Map) props)
-                        .withTarget(df)
-                        .bind();
-            }
-        }
-
-        //TODO: move it to the caller?
-        if (df == null) {
-            throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
-        }
-        return df;
-    }
-
-}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index a050943..3136e98 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -58,7 +58,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -67,7 +67,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
@@ -84,7 +84,7 @@ public class CamelSinkTaskTest {
         records.add(record1);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -128,7 +128,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -172,7 +172,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -222,7 +222,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -281,7 +281,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -345,7 +345,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -407,7 +407,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -466,7 +466,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -509,7 +509,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -533,11 +533,11 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertEquals(1, sinkTask.getCms().getEndpoints()
+        assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
 
         sinkTask.stop();
@@ -560,12 +560,12 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
 
-        assertEquals(1, sinkTask.getCms().getEndpoints()
+        assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
 
         sinkTask.stop();
@@ -590,7 +590,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
@@ -616,7 +616,7 @@ public class CamelSinkTaskTest {
         records.add(record5);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -625,7 +625,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -652,7 +652,7 @@ public class CamelSinkTaskTest {
         records.add(record5);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 8fd8cb4..4d268de 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -39,7 +39,7 @@ public class CamelSourceTaskTest {
     private static final String TOPIC_NAME = "my-topic";
 
     private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) {
-        final ProducerTemplate template = sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
         for (int i = 0; i < size; i++) {
             template.sendBody(DIRECT_URI, "test" + i);
         }
@@ -113,7 +113,7 @@ public class CamelSourceTaskTest {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
-        final ProducerTemplate template = sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
 
         // key in the message with body
         template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 1234);
@@ -150,7 +150,7 @@ public class CamelSourceTaskTest {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
-        final ProducerTemplate template = sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
 
         // send String
         template.sendBody(DIRECT_URI, "test");
@@ -206,9 +206,9 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getEndpoints().size());
+        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("foo"));
@@ -231,9 +231,9 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getEndpoints().size());
+        assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size());
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("direct"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("end"));
@@ -259,7 +259,7 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
             .filter(e -> e.getEndpointUri().startsWith("timer"))
             .forEach(e -> {
                 assertTrue(e.getEndpointUri().contains("foo"));
@@ -281,7 +281,7 @@ public class CamelSourceTaskTest {
         sourceTask.start(props);
 
 
-        final ProducerTemplate template = sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = sourceTask.getCms().getProducerTemplate();
         template.sendBodyAndHeader(DIRECT_URI, "test", "bigdecimal", new BigDecimal(1234567890));
 
         List<SourceRecord> results = sourceTask.poll();
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 679d554..0ce5ab0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,16 +16,12 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.model.dataformat.SyslogDataFormat;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
@@ -82,12 +78,13 @@ public class DataFormatTest {
         props.put("topics", "mytopic");
         props.put("camel.source.marshal", "hl7");
         props.put("camel.source.unmarshal", "syslog");
-
-        List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-        dataformats.add(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-        dataformats.add(new CamelKafkaConnectDataformat("syslog", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc);
+
+        CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withUnmarshallDataFormat("syslog")
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
 
         HL7DataFormat hl7Df = new HL7DataFormat();
         hl7Df.setValidate(false);
@@ -111,9 +108,11 @@ public class DataFormatTest {
         props.put("topics", "mytopic");
         props.put("camel.source.marshal", "hl7");
 
-        List<CamelKafkaConnectDataformat> dataformats = Collections.singletonList(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc);
+        CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
 
         HL7DataFormat hl7df = new HL7DataFormat();
         hl7df.setValidate(false);
@@ -133,9 +132,13 @@ public class DataFormatTest {
         props.put("camel.source.marshal", "hl7");
         props.put("camel.dataformat.hl7.validate", "false");
 
-        List<CamelKafkaConnectDataformat> dataformats = Collections.singletonList(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc);
+
+        CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
+
 
         cms.start();
         HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
index 5b032fe..6e3240c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
@@ -39,7 +39,7 @@ public class PropertiesNameFormatsTest {
 
         CamelSourceTask camelsourceTask = new CamelSourceTask();
         camelsourceTask.start(props);
-        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
+        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
         assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", sedaTestQueue.getClass().getName());
         assertEquals(1, ((TestBlockingQueueFactory)sedaTestQueue).getCounter());
         camelsourceTask.stop();
@@ -55,7 +55,7 @@ public class PropertiesNameFormatsTest {
 
         CamelSourceTask camelsourceTask = new CamelSourceTask();
         camelsourceTask.start(props);
-        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
+        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
         assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", sedaTestQueue.getClass().getName());
         assertEquals(1, ((TestBlockingQueueFactory)sedaTestQueue).getCounter());
         camelsourceTask.stop();


[camel-kafka-connector] 02/04: [core] cleanup CamelSourceTask

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e6049ee4cbdbdde6d4c7912371eb818a704acb8f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 17:10:10 2020 +0200

    [core] cleanup CamelSourceTask
---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 42d9214..604b9b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -22,7 +22,6 @@ import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -58,15 +57,14 @@ public class CamelSourceTask extends SourceTask {
     private CamelKafkaConnectMain cms;
     private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
-    private String topic;
-    private List<String> topics;
+    private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
     private String camelMessageHeaderKey;
 
     @Override
     public String version() {
-        return new CamelSourceConnector().version();
+        return VersionUtil.getVersion();
     }
 
     @Override
@@ -85,8 +83,7 @@ public class CamelSourceTask extends SourceTask {
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
 
-            topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
-            topics = Arrays.asList(topic.split(","));
+            topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
 
@@ -205,7 +202,7 @@ public class CamelSourceTask extends SourceTask {
     }
 
     protected Map<String, String> getDefaultConfig() {
-        return Collections.EMPTY_MAP;
+        return Collections.emptyMap();
     }
 
     protected static String getCamelSourceEndpointConfigPrefix() {
@@ -236,9 +233,7 @@ public class CamelSourceTask extends SourceTask {
             } else if (value instanceof Timestamp) {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
             } else if (value instanceof Date) {
-                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-                String convertedDate = sdf.format(value);
-                record.headers().addString(keyCamelHeader, (String)convertedDate);
+                record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value));
             } else if (value instanceof BigDecimal) {
                 Schema schema = Decimal.schema(((BigDecimal)value).scale());
                 record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);


[camel-kafka-connector] 04/04: [core] cleanup TaskHelper

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8dde130b9737c865c4795bfeb21958572ff3c4d8
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 18:18:34 2020 +0200

    [core] cleanup TaskHelper
---
 .../main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 6fbeb20..78e489d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -81,21 +81,21 @@ public final class TaskHelper {
                 .filter(k -> k.startsWith(prefix))
                 .map(k -> k.replace(prefix, "") + "=" + props.get(k))
                 .reduce((o1, o2) -> o1 + "&" + o2)
-                .map(result -> (result == null || result.isEmpty()) ? "" : "?" + result)
+                .map(result -> result.isEmpty() ? "" : "?" + result)
                 .orElse("");
     }
 
     public static String createUrlPathFromProperties(Map<String, String> props, String prefix) {
         return props.keySet().stream()
                 .filter(k -> k.startsWith(prefix))
-                .map(k -> props.get(k))
+                .map(props::get)
                 .reduce((p1, p2) -> p1 + ":" + p2)
                 .orElse("");
     }
 
     public static Map<String, String> mergeProperties(Map<String, String> defaultProps, Map<String, String> loadedProps) {
         if (loadedProps == null && defaultProps == null) {
-            return Collections.EMPTY_MAP;
+            return Collections.emptyMap();
         } else if (loadedProps == null) {
             return new HashMap<>(defaultProps);
         } else if (defaultProps == null) {