You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/04/13 08:20:18 UTC

[camel] branch master updated: CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 952b4a8  CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337)
952b4a8 is described below

commit 952b4a81df9c50d5bb0df63785b32283e1cdcbeb
Author: Ramu <kr...@gmail.com>
AuthorDate: Tue Apr 13 13:49:48 2021 +0530

    CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337)
    
    Co-authored-by: Kodanda Ramu Kakarla <kk...@kkakarla.pnq.csb>
---
 .../src/main/docs/vertx-kafka-component.adoc       |  1 +
 .../kafka/VertxKafkaConfigurationOptionsProxy.java | 11 +++++++
 .../component/vertx/kafka/VertxKafkaConstants.java |  1 +
 .../operations/VertxKafkaProducerOperations.java   | 16 +++++++++-
 .../VertxKafkaProducerOperationsTest.java          | 35 ++++++++++++++++++++++
 5 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
index 9eaa5ed..6c86bf5 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
@@ -343,6 +343,7 @@ Before sending a message to Kafka you can configure the following headers.
 |`CamelVertxKafkaMessageKey`| `VertxKafkaConstants.MESSAGE_KEY`|`String`| Explicitly specify the message key, if partition ID is not specified, this will trigger the messages to go into the same partition.
 |`CamelVertxKafkaTopic`| `VertxKafkaConstants.TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will be *preserved* in case of header aggregation.
 |`CamelVertxKafkaOverrideTopic`| `VertxKafkaConstants.OVERRIDE_TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will *not be preserved* in case of header aggregation and it will take *precedence* over `CamelVertxKafkaTopic`.
+| `CamelVertxKafkaOverrideTimestamp` | `VertxKafkaConstants.OVERRIDE_TIMESTAMP` | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the  record with the provided timestamp and the header is not preserved. 
 |=======================================================================
 
 If you want to send a message to a dynamic topic then use `VertxKafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
index 253c9f6..6ab88e9 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
@@ -46,6 +46,17 @@ public class VertxKafkaConfigurationOptionsProxy {
         return configuration.getValueSerializer();
     }
 
+    public Object getOverrideTimestamp(final Message message) {
+        Object timestamp = getOption(message, VertxKafkaConstants.OVERRIDE_TIMESTAMP, () -> null, Object.class);
+        if (ObjectHelper.isNotEmpty(timestamp)) {
+            // must remove header so its not propagated
+            message.removeHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP);
+        }
+
+        return timestamp;
+
+    }
+
     public String getTopic(final Message message) {
         return getOption(message, VertxKafkaConstants.TOPIC, configuration::getTopic, String.class);
     }
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
index b822097..30a3da0 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
@@ -28,6 +28,7 @@ public final class VertxKafkaConstants {
     public static final String OFFSET = HEADER_PREFIX + "Offset";
     public static final String HEADERS = HEADER_PREFIX + "Headers";
     public static final String TIMESTAMP = HEADER_PREFIX + "Timestamp";
+    public static final String OVERRIDE_TIMESTAMP = HEADER_PREFIX + "OverrideTimestamp";
     public static final String MANUAL_COMMIT = HEADER_PREFIX + "ManualCommit";
     // headers evaluated by the producer only
     public static final String OVERRIDE_TOPIC = HEADER_PREFIX + "OverrideTopic";
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
index d29460c..b27dc9e 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
@@ -142,11 +142,12 @@ public class VertxKafkaProducerOperations {
         final Object messageKey = getMessageKey(message);
         final Object messageValue = getMessageValue(message, inputData);
         final Integer partitionId = getPartitionId(message);
+        final Long overrideTimestamp = getOverrideTimestamp(message);
         final List<KafkaHeader> propagatedHeaders
                 = new VertxKafkaHeadersPropagation(configurationOptionsProxy.getConfiguration().getHeaderFilterStrategy())
                         .getPropagatedHeaders(message);
 
-        return KafkaProducerRecord.create(topic, messageKey, messageValue, partitionId)
+        return KafkaProducerRecord.create(topic, messageKey, messageValue, overrideTimestamp, partitionId)
                 .addHeaders(propagatedHeaders);
     }
 
@@ -189,4 +190,17 @@ public class VertxKafkaProducerOperations {
         return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, inputData,
                 configurationOptionsProxy.getValueSerializer(message));
     }
+
+    private Long getOverrideTimestamp(final Message message) {
+
+        Object timeStamp = configurationOptionsProxy.getOverrideTimestamp(message);
+        Long overrideTimestamp = null;
+        if (ObjectHelper.isNotEmpty(timeStamp)) {
+            overrideTimestamp = ((Long) timeStamp).longValue();
+        }
+
+        return overrideTimestamp;
+
+    }
+
 }
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
index eeee1d8..bc4f5d2 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.vertx.kafka.operations;
 
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -274,6 +276,27 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport {
     }
 
     @Test
+    void testSendEventWithOverrideTopicHeaderAndTimestamp() {
+        configuration.setTopic("sometopic");
+        Long timestamp = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+
+        final Message message = createMessage();
+
+        message.setHeader(VertxKafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+        message.setHeader(VertxKafkaConstants.MESSAGE_KEY, "someKey");
+        message.setHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP, timestamp);
+        message.setBody("test");
+
+        sendEvent(message);
+
+        // the header is now removed
+        assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+        assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP));
+
+        verifySendMessage("anotherTopic", "someKey", timestamp, "test");
+    }
+
+    @Test
     void testSendEventWithNoTopicSet() {
         configuration.setTopic(null);
 
@@ -539,6 +562,18 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport {
         });
     }
 
+    private void verifySendMessage(
+            final String topic, final Object messageKey, final Long timestamp, final Object messageBody) {
+        assertProducedMessages(records -> {
+            assertEquals(1, records.size());
+            assertEquals(topic, records.get(0).topic());
+            assertEquals(messageKey, records.get(0).key());
+            assertEquals(messageBody, records.get(0).value());
+            assertEquals(timestamp, records.get(0).timestamp());
+
+        });
+    }
+
     private void assertProducedMessages(final Consumer<List<ProducerRecord<Object, Object>>> recordsFn) {
         Awaitility
                 .await()