You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/08 17:27:08 UTC

[camel-kafka-connector] branch master updated: source: better handling of headers of type Data #544

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fd48c1  source: better handling of headers of type Data #544
7fd48c1 is described below

commit 7fd48c150172fa9419fe33dde560254bb35845cc
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 8 16:51:06 2020 +0200

    source: better handling of headers of type Data #544
---
 .../camel/kafkaconnector/CamelConnectorUtils.java  | 49 ++++++++++++++++++++++
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 13 +++++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  9 +---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 32 ++++++++++++++
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 30 +++++++++++++
 5 files changed, 123 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java
new file mode 100644
index 0000000..fa6a3e5
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+public final class CamelConnectorUtils {
+    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    private CamelConnectorUtils() {
+    }
+
+    public static Date truncateDate(Date value) {
+        Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC);
+        calendar.setTime(value);
+        calendar.set(Calendar.YEAR, 0);
+        calendar.set(Calendar.MONTH, 0);
+        calendar.set(Calendar.DAY_OF_MONTH, 0);
+
+        return calendar.getTime();
+    }
+
+    public static Date truncateTime(Date value) {
+        Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC);
+        calendar.setTime(value);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        return calendar.getTime();
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 51cfe88..63e139c 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,9 +18,11 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -35,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -172,7 +175,10 @@ public class CamelSinkTask extends SinkTask {
     private void addHeader(Map<String, Object> map, Header singleHeader) {
         String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX);
         Schema schema = singleHeader.schema();
-        if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
+
+        if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
+            map.put(camelHeaderKey, (Date)singleHeader.value());
+        } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
             map.put(camelHeaderKey, (String)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
             map.put(camelHeaderKey, (Boolean)singleHeader.value());
@@ -204,7 +210,10 @@ public class CamelSinkTask extends SinkTask {
     private void addProperty(Exchange exchange, Header singleHeader) {
         String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX);
         Schema schema = singleHeader.schema();
-        if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
+
+        if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
+            exchange.getProperties().put(camelPropertyKey, (Date)singleHeader.value());
+        } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
             exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) {
             exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 724ddd7..c793f1b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -17,9 +17,6 @@
 package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -237,12 +234,8 @@ public class CamelSourceTask extends SourceTask {
                 }
 
                 record.headers().addBytes(keyCamelHeader, bytes);
-            } else if (value instanceof Time) {
-                record.headers().addTime(keyCamelHeader, (Time)value);
-            } else if (value instanceof Timestamp) {
-                record.headers().addTimestamp(keyCamelHeader, (Timestamp)value);
             } else if (value instanceof Date) {
-                record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value));
+                record.headers().addTimestamp(keyCamelHeader, (Date)value);
             } else if (value instanceof BigDecimal) {
                 Schema schema = Decimal.schema(((BigDecimal)value).scale());
                 record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 6e28b0e..4a8fc9e 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -691,4 +693,34 @@ public class CamelSinkTaskTest {
         sinkTask.stop();
     }
 
+    @Test
+    public void testBodyAndDateHeader() {
+        final Date now = new Date();
+
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        try {
+            List<SinkRecord> records = new ArrayList<>();
+
+            SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+            record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now);
+            records.add(record);
+
+            sinkTask.put(records);
+
+            Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT);
+
+            assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> {
+                assertThat(value).isEqualTo(now);
+            });
+        } finally {
+            sinkTask.stop();
+        }
+    }
+
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 25088cf..ef393f9 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -329,6 +330,35 @@ public class CamelSourceTaskTest {
     }
 
     @Test
+    public void testSourceDateHeader() {
+        final String key = "_key";
+        final Date now = new Date();
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(mapOf(
+            CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME,
+            CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct",
+            CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start"
+        ));
+
+        sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", key, now);
+
+        try {
+            List<SourceRecord> results = sourceTask.poll();
+            assertThat(results).hasSize(1);
+
+            Header header = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + key).next();
+
+            assertThat(header.schema().type()).isEqualTo(Schema.Type.INT64);
+            assertThat(header.value()).isInstanceOfSatisfying(Date.class, value -> {
+                assertThat(value).isEqualTo(now);
+            });
+        } finally {
+            sourceTask.stop();
+        }
+    }
+
+    @Test
     public void testSourcePollingWithAggregationBySize() {
         final int size = 10;
         final int chunkSize = 5;