You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/07 21:01:52 UTC

[camel-kafka-connector] 02/04: [core] cleanup CamelSourceTask

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e6049ee4cbdbdde6d4c7912371eb818a704acb8f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 17:10:10 2020 +0200

    [core] cleanup CamelSourceTask
---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++----------
 1 file changed, 5 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 42d9214..604b9b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -22,7 +22,6 @@ import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -58,15 +57,14 @@ public class CamelSourceTask extends SourceTask {
     private CamelKafkaConnectMain cms;
     private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
-    private String topic;
-    private List<String> topics;
+    private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
     private String camelMessageHeaderKey;
 
     @Override
     public String version() {
-        return new CamelSourceConnector().version();
+        return VersionUtil.getVersion();
     }
 
     @Override
@@ -85,8 +83,7 @@ public class CamelSourceTask extends SourceTask {
             final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
 
-            topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
-            topics = Arrays.asList(topic.split(","));
+            topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
 
@@ -205,7 +202,7 @@ public class CamelSourceTask extends SourceTask {
     }
 
     protected Map<String, String> getDefaultConfig() {
-        return Collections.EMPTY_MAP;
+        return Collections.emptyMap();
     }
 
     protected static String getCamelSourceEndpointConfigPrefix() {
@@ -236,9 +233,7 @@ public class CamelSourceTask extends SourceTask {
             } else if (value instanceof Timestamp) {
                 record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
             } else if (value instanceof Date) {
-                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-                String convertedDate = sdf.format(value);
-                record.headers().addString(keyCamelHeader, (String)convertedDate);
+                record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value));
             } else if (value instanceof BigDecimal) {
                 Schema schema = Decimal.schema(((BigDecimal)value).scale());
                 record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);