You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/10 06:01:06 UTC

[camel] branch master updated: "CAMEL-16414:camel-kafka set custom timestamp" (#5312)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f0c9af  "CAMEL-16414:camel-kafka set custom timestamp" (#5312)
3f0c9af is described below

commit 3f0c9afa0ee30e62f1aea8a6dd08412b8aadbb9b
Author: Ramu <kr...@gmail.com>
AuthorDate: Sat Apr 10 11:30:40 2021 +0530

    "CAMEL-16414:camel-kafka set custom timestamp" (#5312)
    
    Co-authored-by: Kodanda Ramu Kakarla <kk...@kkakarla.pnq.csb>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  1 +
 .../camel/component/kafka/KafkaConstants.java      |  1 +
 .../camel/component/kafka/KafkaProducer.java       | 19 ++++++++++++++--
 .../camel/component/kafka/KafkaProducerTest.java   | 26 ++++++++++++++++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 397f064..da2db71 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -310,6 +310,7 @@ Before sending a message to Kafka you can configure the following headers.
 | Header constant              | Header value          | Type    | Description
 | KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* The key of the message in order to ensure that all related message goes in the same partition
 | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic to which send the message (override and takes precedence), and the header is not preserved.
+| KafkaConstants.OVERRIDE_TIMESTAMP | "kafka.OVERRIDE_TIMESTAMP" | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the  record with the provided timestamp and the header is not preserved. 
 | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition
 |===
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 785fc21..edc2dcb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -28,6 +28,7 @@ public final class KafkaConstants {
     public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT";
     public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
     public static final String TIMESTAMP = "kafka.TIMESTAMP";
+    public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP";
 
     @Deprecated
     public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 32aa440..f720f2b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -146,6 +146,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
         String topic = endpoint.getConfiguration().getTopic();
+        Long timeStamp = null;
 
         // must remove header so its not propagated
         Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
@@ -160,6 +161,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
             topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
         }
 
+        Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+        if ((overrideTimeStamp != null) && (overrideTimeStamp instanceof Long)) {
+            LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
+            timeStamp = ((Long) overrideTimeStamp).longValue();
+        }
+
         // extracting headers which need to be propagated
         List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
 
@@ -189,6 +196,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     String innerTopic = msgTopic;
                     Object innerKey = null;
                     Integer innerPartitionKey = null;
+                    Long innerTimestamp = null;
 
                     Object value = next;
                     Exchange ex = null;
@@ -223,6 +231,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
                             }
                         }
 
+                        if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) {
+                            if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) instanceof Long) {
+                                innerTimestamp
+                                        = ((Long) innerMmessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP)).longValue();
+                            }
+                        }
                         ex = innerExchange == null ? exchange : innerExchange;
                         value = tryConvertToSerializedType(ex, innerMmessage.getBody(),
                                 endpoint.getConfiguration().getValueSerializer());
@@ -231,7 +245,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
                     return new KeyValueHolder(
                             body,
-                            new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
+                            new ProducerRecord(
+                                    innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
                 }
 
                 @Override
@@ -257,7 +272,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         // the serializer
         Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
 
-        ProducerRecord record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
+        ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
         return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 2b812e5..765c561 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -215,6 +217,8 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
+        in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
+                LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
 
         producer.process(exchange);
 
@@ -303,6 +307,21 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void processSendsMessageWithMessageTimestampHeader() throws Exception {
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(out);
+        in.setHeader(KafkaConstants.KEY, "someKey");
+        in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
+                LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+
+        producer.process(exchange);
+
+        verifySendMessage("someTopic", "someKey");
+        assertRecordMetadataTimestampExists();
+    }
+
+    @Test
     public void processSendMessageWithTopicHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
@@ -425,6 +444,13 @@ public class KafkaProducerTest {
         assertEquals(expectedTopics, actualTopics);
     }
 
+    private void assertRecordMetadataTimestampExists() {
+        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+        assertNotNull(recordMetaData1);
+        assertEquals(recordMetaData1.size(), 1, "Expected one recordMetaData");
+        assertNotNull(recordMetaData1.get(0).timestamp());
+    }
+
     private void assertRecordMetadataExists() {
         List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
         assertNotNull(recordMetaData1);