You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2021/04/13 08:20:18 UTC
[camel] branch master updated: CAMEL-16481:camel-vertx-kafka set
custom timestamp (#5337)
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 952b4a8 CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337)
952b4a8 is described below
commit 952b4a81df9c50d5bb0df63785b32283e1cdcbeb
Author: Ramu <kr...@gmail.com>
AuthorDate: Tue Apr 13 13:49:48 2021 +0530
CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337)
Co-authored-by: Kodanda Ramu Kakarla <kk...@kkakarla.pnq.csb>
---
.../src/main/docs/vertx-kafka-component.adoc | 1 +
.../kafka/VertxKafkaConfigurationOptionsProxy.java | 11 +++++++
.../component/vertx/kafka/VertxKafkaConstants.java | 1 +
.../operations/VertxKafkaProducerOperations.java | 16 +++++++++-
.../VertxKafkaProducerOperationsTest.java | 35 ++++++++++++++++++++++
5 files changed, 63 insertions(+), 1 deletion(-)
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
index 9eaa5ed..6c86bf5 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc
@@ -343,6 +343,7 @@ Before sending a message to Kafka you can configure the following headers.
|`CamelVertxKafkaMessageKey`| `VertxKafkaConstants.MESSAGE_KEY`|`String`| Explicitly specify the message key, if partition ID is not specified, this will trigger the messages to go into the same partition.
|`CamelVertxKafkaTopic`| `VertxKafkaConstants.TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will be *preserved* in case of header aggregation.
|`CamelVertxKafkaOverrideTopic`| `VertxKafkaConstants.OVERRIDE_TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will *not be preserved* in case of header aggregation and it will take *precedence* over `CamelVertxKafkaTopic`.
+| `CamelVertxKafkaOverrideTimestamp` | `VertxKafkaConstants.OVERRIDE_TIMESTAMP` | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.
|=======================================================================
If you want to send a message to a dynamic topic then use `VertxKafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
index 253c9f6..6ab88e9 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java
@@ -46,6 +46,17 @@ public class VertxKafkaConfigurationOptionsProxy {
return configuration.getValueSerializer();
}
+ public Object getOverrideTimestamp(final Message message) {
+ Object timestamp = getOption(message, VertxKafkaConstants.OVERRIDE_TIMESTAMP, () -> null, Object.class);
+ if (ObjectHelper.isNotEmpty(timestamp)) {
+ // must remove header so its not propagated
+ message.removeHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP);
+ }
+
+ return timestamp;
+
+ }
+
public String getTopic(final Message message) {
return getOption(message, VertxKafkaConstants.TOPIC, configuration::getTopic, String.class);
}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
index b822097..30a3da0 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java
@@ -28,6 +28,7 @@ public final class VertxKafkaConstants {
public static final String OFFSET = HEADER_PREFIX + "Offset";
public static final String HEADERS = HEADER_PREFIX + "Headers";
public static final String TIMESTAMP = HEADER_PREFIX + "Timestamp";
+ public static final String OVERRIDE_TIMESTAMP = HEADER_PREFIX + "OverrideTimestamp";
public static final String MANUAL_COMMIT = HEADER_PREFIX + "ManualCommit";
// headers evaluated by the producer only
public static final String OVERRIDE_TOPIC = HEADER_PREFIX + "OverrideTopic";
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
index d29460c..b27dc9e 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java
@@ -142,11 +142,12 @@ public class VertxKafkaProducerOperations {
final Object messageKey = getMessageKey(message);
final Object messageValue = getMessageValue(message, inputData);
final Integer partitionId = getPartitionId(message);
+ final Long overrideTimestamp = getOverrideTimestamp(message);
final List<KafkaHeader> propagatedHeaders
= new VertxKafkaHeadersPropagation(configurationOptionsProxy.getConfiguration().getHeaderFilterStrategy())
.getPropagatedHeaders(message);
- return KafkaProducerRecord.create(topic, messageKey, messageValue, partitionId)
+ return KafkaProducerRecord.create(topic, messageKey, messageValue, overrideTimestamp, partitionId)
.addHeaders(propagatedHeaders);
}
@@ -189,4 +190,17 @@ public class VertxKafkaProducerOperations {
return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, inputData,
configurationOptionsProxy.getValueSerializer(message));
}
+
+ private Long getOverrideTimestamp(final Message message) {
+
+ Object timeStamp = configurationOptionsProxy.getOverrideTimestamp(message);
+ Long overrideTimestamp = null;
+ if (ObjectHelper.isNotEmpty(timeStamp)) {
+ overrideTimestamp = ((Long) timeStamp).longValue();
+ }
+
+ return overrideTimestamp;
+
+ }
+
}
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
index eeee1d8..bc4f5d2 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.vertx.kafka.operations;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -274,6 +276,27 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport {
}
@Test
+ void testSendEventWithOverrideTopicHeaderAndTimestamp() {
+ configuration.setTopic("sometopic");
+ Long timestamp = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+
+ final Message message = createMessage();
+
+ message.setHeader(VertxKafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
+ message.setHeader(VertxKafkaConstants.MESSAGE_KEY, "someKey");
+ message.setHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP, timestamp);
+ message.setBody("test");
+
+ sendEvent(message);
+
+ // the header is now removed
+ assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TOPIC));
+ assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP));
+
+ verifySendMessage("anotherTopic", "someKey", timestamp, "test");
+ }
+
+ @Test
void testSendEventWithNoTopicSet() {
configuration.setTopic(null);
@@ -539,6 +562,18 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport {
});
}
+ private void verifySendMessage(
+ final String topic, final Object messageKey, final Long timestamp, final Object messageBody) {
+ assertProducedMessages(records -> {
+ assertEquals(1, records.size());
+ assertEquals(topic, records.get(0).topic());
+ assertEquals(messageKey, records.get(0).key());
+ assertEquals(messageBody, records.get(0).value());
+ assertEquals(timestamp, records.get(0).timestamp());
+
+ });
+ }
+
private void assertProducedMessages(final Consumer<List<ProducerRecord<Object, Object>>> recordsFn) {
Awaitility
.await()