You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/07 21:01:52 UTC
[camel-kafka-connector] 02/04: [core] cleanup CamelSourceTask
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6049ee4cbdbdde6d4c7912371eb818a704acb8f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 17:10:10 2020 +0200
[core] cleanup CamelSourceTask
---
.../org/apache/camel/kafkaconnector/CamelSourceTask.java | 15 +++++----------
1 file changed, 5 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 42d9214..604b9b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -22,7 +22,6 @@ import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -58,15 +57,14 @@ public class CamelSourceTask extends SourceTask {
private CamelKafkaConnectMain cms;
private CamelSourceConnectorConfig config;
private PollingConsumer consumer;
- private String topic;
- private List<String> topics;
+ private String[] topics;
private Long maxBatchPollSize;
private Long maxPollDuration;
private String camelMessageHeaderKey;
@Override
public String version() {
- return new CamelSourceConnector().version();
+ return VersionUtil.getVersion();
}
@Override
@@ -85,8 +83,7 @@ public class CamelSourceTask extends SourceTask {
final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
- topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
- topics = Arrays.asList(topic.split(","));
+ topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
String localUrl = getLocalUrlWithPollingOptions(config);
@@ -205,7 +202,7 @@ public class CamelSourceTask extends SourceTask {
}
protected Map<String, String> getDefaultConfig() {
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
}
protected static String getCamelSourceEndpointConfigPrefix() {
@@ -236,9 +233,7 @@ public class CamelSourceTask extends SourceTask {
} else if (value instanceof Timestamp) {
record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
} else if (value instanceof Date) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- String convertedDate = sdf.format(value);
- record.headers().addString(keyCamelHeader, (String)convertedDate);
+ record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value));
} else if (value instanceof BigDecimal) {
Schema schema = Decimal.schema(((BigDecimal)value).scale());
record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);