You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2020/11/26 20:43:16 UTC

[camel-kafka-connector] branch master updated: fix #738 : added timestamp information to source records.

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b58ec  fix #738 : added timestamp information to source records.
f1b58ec is described below

commit f1b58ecac6e97eee22166893f02bc6347fd39387
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Thu Nov 26 17:25:53 2020 +0100

    fix #738 : added timestamp information to source records.
---
 .../kafkaconnector/timer/CamelTimerSourceTask.java | 22 ++++++++++++++++++----
 .../camel/kafkaconnector/CamelSourceTask.java      | 10 ++++++++--
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  2 ++
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
index 3dadb5a..b07fdfe 100644
--- a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
+++ b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
@@ -16,13 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.timer;
 
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Generated;
+import org.apache.camel.Exchange;
 import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
 import org.apache.camel.kafkaconnector.CamelSourceTask;
 
-@Generated("This class has been generated by camel-kafka-connector-generator-maven-plugin, remove this annotation to prevent it from being generated.")
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
 public class CamelTimerSourceTask extends CamelSourceTask {
 
     @Override
@@ -36,4 +37,17 @@ public class CamelTimerSourceTask extends CamelSourceTask {
             put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
         }};
     }
+
+    //XXX: this method override is the only difference from how the class was initially generated by camel-kafka-connector-generator-maven-plugin
+    @Override
+    protected long calculateTimestamp(Exchange exchange) {
+        if (exchange != null) {
+            Date fireDate = exchange.getProperty(Exchange.TIMER_FIRED_TIME, Date.class);
+            if (fireDate != null) {
+                return fireDate.getTime();
+            }
+        }
+
+        return super.calculateTimestamp(exchange);
+    }
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index dd2ddc9..f196c25 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -173,9 +173,11 @@ public class CamelSourceTask extends SourceTask {
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
+            final long timestamp = calculateTimestamp(exchange);
+
             for (String singleTopic : topics) {
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema,
-                        messageHeaderKey, messageBodySchema, messageBodyValue);
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+                        messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
 
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
@@ -240,6 +242,10 @@ public class CamelSourceTask extends SourceTask {
         return CAMEL_SOURCE_PATH_PROPERTIES_PREFIX;
     }
 
+    protected long calculateTimestamp(Exchange exchange) {
+        return System.currentTimeMillis();
+    }
+
     private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map, String prefix) {
 
         for (Map.Entry<String, Object> entry : map.entrySet()) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 49bf878..2a85664 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
 import static org.apache.camel.util.CollectionHelper.mapOf;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -67,6 +68,7 @@ public class CamelSourceTaskTest {
 
         assertEquals(size, poll.size());
         assertEquals(TOPIC_NAME, poll.get(0).topic());
+        assertNotNull(poll.get(0).timestamp());
         assertEquals(LoggingLevel.OFF.toString(), sourceTask.getCamelSourceConnectorConfig(props)
             .getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF));