You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 17:27:08 UTC
[camel-kafka-connector] branch master updated: source: better
handling of headers of type Data #544
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7fd48c1 source: better handling of headers of type Data #544
7fd48c1 is described below
commit 7fd48c150172fa9419fe33dde560254bb35845cc
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 16:51:06 2020 +0200
source: better handling of headers of type Data #544
---
.../camel/kafkaconnector/CamelConnectorUtils.java | 49 ++++++++++++++++++++++
.../apache/camel/kafkaconnector/CamelSinkTask.java | 13 +++++-
.../camel/kafkaconnector/CamelSourceTask.java | 9 +---
.../camel/kafkaconnector/CamelSinkTaskTest.java | 32 ++++++++++++++
.../camel/kafkaconnector/CamelSourceTaskTest.java | 30 +++++++++++++
5 files changed, 123 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java
new file mode 100644
index 0000000..fa6a3e5
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+public final class CamelConnectorUtils {
+ public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+ private CamelConnectorUtils() {
+ }
+
+ public static Date truncateDate(Date value) {
+ Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC);
+ calendar.setTime(value);
+ calendar.set(Calendar.YEAR, 0);
+ calendar.set(Calendar.MONTH, 0);
+ calendar.set(Calendar.DAY_OF_MONTH, 0);
+
+ return calendar.getTime();
+ }
+
+ public static Date truncateTime(Date value) {
+ Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC);
+ calendar.setTime(value);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ return calendar.getTime();
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 51cfe88..63e139c 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,9 +18,11 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -35,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -172,7 +175,10 @@ public class CamelSinkTask extends SinkTask {
private void addHeader(Map<String, Object> map, Header singleHeader) {
String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX);
Schema schema = singleHeader.schema();
- if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
+
+ if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
+ map.put(camelHeaderKey, (Date)singleHeader.value());
+ } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
map.put(camelHeaderKey, (String)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
map.put(camelHeaderKey, (Boolean)singleHeader.value());
@@ -204,7 +210,10 @@ public class CamelSinkTask extends SinkTask {
private void addProperty(Exchange exchange, Header singleHeader) {
String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX);
Schema schema = singleHeader.schema();
- if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
+
+ if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
+ exchange.getProperties().put(camelPropertyKey, (Date)singleHeader.value());
+ } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 724ddd7..c793f1b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -17,9 +17,6 @@
package org.apache.camel.kafkaconnector;
import java.math.BigDecimal;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -237,12 +234,8 @@ public class CamelSourceTask extends SourceTask {
}
record.headers().addBytes(keyCamelHeader, bytes);
- } else if (value instanceof Time) {
- record.headers().addTime(keyCamelHeader, (Time)value);
- } else if (value instanceof Timestamp) {
- record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
} else if (value instanceof Date) {
- record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value));
+ record.headers().addTimestamp(keyCamelHeader, (Date)value);
} else if (value instanceof BigDecimal) {
Schema schema = Decimal.schema(((BigDecimal)value).scale());
record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 6e28b0e..4a8fc9e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -691,4 +693,34 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
+ @Test
+ public void testBodyAndDateHeader() {
+ final Date now = new Date();
+
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ try {
+ List<SinkRecord> records = new ArrayList<>();
+
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+ records.add(record);
+
+ sinkTask.put(records);
+
+ Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+
+ assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+ assertThat(value).isEqualTo(now);
+ });
+ } finally {
+ sinkTask.stop();
+ }
+ }
+
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 25088cf..ef393f9 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -329,6 +330,35 @@ public class CamelSourceTaskTest {
}
@Test
+ public void testSourceDateHeader() {
+ final String key = "_key";
+ final Date now = new Date();
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(mapOf(
+ CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+ CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+ CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+ ));
+
+ sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", key, now);
+
+ try {
+ List<SourceRecord> results = sourceTask.poll();
+ assertThat(results).hasSize(1);
+
+ Header header = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + key).next();
+
+ assertThat(header.schema().type()).isEqualTo(Schema.Type.INT64);
+ assertThat(header.value()).isInstanceOfSatisfying(Date.class, value -> {
+ assertThat(value).isEqualTo(now);
+ });
+ } finally {
+ sourceTask.stop();
+ }
+ }
+
+ @Test
public void testSourcePollingWithAggregationBySize() {
final int size = 10;
final int chunkSize = 5;