You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2020/11/26 20:43:16 UTC
[camel-kafka-connector] branch master updated: fix #738 : added
timestamp information to source records.
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f1b58ec fix #738 : added timestamp information to source records.
f1b58ec is described below
commit f1b58ecac6e97eee22166893f02bc6347fd39387
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Thu Nov 26 17:25:53 2020 +0100
fix #738 : added timestamp information to source records.
---
.../kafkaconnector/timer/CamelTimerSourceTask.java | 22 ++++++++++++++++++----
.../camel/kafkaconnector/CamelSourceTask.java | 10 ++++++++--
.../camel/kafkaconnector/CamelSourceTaskTest.java | 2 ++
3 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
index 3dadb5a..b07fdfe 100644
--- a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
+++ b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java
@@ -16,13 +16,14 @@
*/
package org.apache.camel.kafkaconnector.timer;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Generated;
+import org.apache.camel.Exchange;
import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
import org.apache.camel.kafkaconnector.CamelSourceTask;
-@Generated("This class has been generated by camel-kafka-connector-generator-maven-plugin, remove this annotation to prevent it from being generated.")
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
public class CamelTimerSourceTask extends CamelSourceTask {
@Override
@@ -36,4 +37,17 @@ public class CamelTimerSourceTask extends CamelSourceTask {
put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
}};
}
+
+ //XXX: this method override is the only difference from how the class was initially generated by camel-kafka-connector-generator-maven-plugin
+ @Override
+ protected long calculateTimestamp(Exchange exchange) {
+ if (exchange != null) {
+ Date fireDate = exchange.getProperty(Exchange.TIMER_FIRED_TIME, Date.class);
+ if (fireDate != null) {
+ return fireDate.getTime();
+ }
+ }
+
+ return super.calculateTimestamp(exchange);
+ }
}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index dd2ddc9..f196c25 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -173,9 +173,11 @@ public class CamelSourceTask extends SourceTask {
final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+ final long timestamp = calculateTimestamp(exchange);
+
for (String singleTopic : topics) {
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema,
- messageHeaderKey, messageBodySchema, messageBodyValue);
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
+ messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
@@ -240,6 +242,10 @@ public class CamelSourceTask extends SourceTask {
return CAMEL_SOURCE_PATH_PROPERTIES_PREFIX;
}
+ protected long calculateTimestamp(Exchange exchange) {
+ return System.currentTimeMillis();
+ }
+
private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map, String prefix) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 49bf878..2a85664 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
import static org.apache.camel.util.CollectionHelper.mapOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -67,6 +68,7 @@ public class CamelSourceTaskTest {
assertEquals(size, poll.size());
assertEquals(TOPIC_NAME, poll.get(0).topic());
+ assertNotNull(poll.get(0).timestamp());
assertEquals(LoggingLevel.OFF.toString(), sourceTask.getCamelSourceConnectorConfig(props)
.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF));