You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/10 06:01:06 UTC
[camel] branch master updated: "CAMEL-16414:camel-kafka set custom
timestamp" (#5312)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3f0c9af "CAMEL-16414:camel-kafka set custom timestamp" (#5312)
3f0c9af is described below
commit 3f0c9afa0ee30e62f1aea8a6dd08412b8aadbb9b
Author: Ramu <kr...@gmail.com>
AuthorDate: Sat Apr 10 11:30:40 2021 +0530
"CAMEL-16414:camel-kafka set custom timestamp" (#5312)
Co-authored-by: Kodanda Ramu Kakarla <kk...@kkakarla.pnq.csb>
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 1 +
.../camel/component/kafka/KafkaConstants.java | 1 +
.../camel/component/kafka/KafkaProducer.java | 19 ++++++++++++++--
.../camel/component/kafka/KafkaProducerTest.java | 26 ++++++++++++++++++++++
4 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 397f064..da2db71 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -310,6 +310,7 @@ Before sending a message to Kafka you can configure the following headers.
| Header constant | Header value | Type | Description
| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition
| KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved.
+| KafkaConstants.OVERRIDE_TIMESTAMP | "kafka.OVERRIDE_TIMESTAMP" | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved.
| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition
|===
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 785fc21..edc2dcb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -28,6 +28,7 @@ public final class KafkaConstants {
public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT";
public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD";
public static final String TIMESTAMP = "kafka.TIMESTAMP";
+ public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP";
@Deprecated
public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 32aa440..f720f2b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -146,6 +146,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
@SuppressWarnings({ "unchecked", "rawtypes" })
protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
String topic = endpoint.getConfiguration().getTopic();
+ Long timeStamp = null;
// must remove header so its not propagated
Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
@@ -160,6 +161,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
}
+ Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+ if ((overrideTimeStamp != null) && (overrideTimeStamp instanceof Long)) {
+ LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
+ timeStamp = ((Long) overrideTimeStamp).longValue();
+ }
+
// extracting headers which need to be propagated
List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
@@ -189,6 +196,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
String innerTopic = msgTopic;
Object innerKey = null;
Integer innerPartitionKey = null;
+ Long innerTimestamp = null;
Object value = next;
Exchange ex = null;
@@ -223,6 +231,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
}
+ if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) {
+ if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) instanceof Long) {
+ innerTimestamp
+ = ((Long) innerMmessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP)).longValue();
+ }
+ }
ex = innerExchange == null ? exchange : innerExchange;
value = tryConvertToSerializedType(ex, innerMmessage.getBody(),
endpoint.getConfiguration().getValueSerializer());
@@ -231,7 +245,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
return new KeyValueHolder(
body,
- new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders));
+ new ProducerRecord(
+ innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders));
}
@Override
@@ -257,7 +272,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
// the serializer
Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
- ProducerRecord record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders);
+ ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 2b812e5..765c561 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.kafka;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -215,6 +217,8 @@ public class KafkaProducerTest {
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
in.setHeader(KafkaConstants.KEY, "someKey");
+ in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
+ LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
producer.process(exchange);
@@ -303,6 +307,21 @@ public class KafkaProducerTest {
}
@Test
+ public void processSendsMessageWithMessageTimestampHeader() throws Exception {
+ endpoint.getConfiguration().setTopic("someTopic");
+ Mockito.when(exchange.getIn()).thenReturn(in);
+ Mockito.when(exchange.getMessage()).thenReturn(out);
+ in.setHeader(KafkaConstants.KEY, "someKey");
+ in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
+ LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+
+ producer.process(exchange);
+
+ verifySendMessage("someTopic", "someKey");
+ assertRecordMetadataTimestampExists();
+ }
+
+ @Test
public void processSendMessageWithTopicHeader() throws Exception {
endpoint.getConfiguration().setTopic("someTopic");
Mockito.when(exchange.getIn()).thenReturn(in);
@@ -425,6 +444,13 @@ public class KafkaProducerTest {
assertEquals(expectedTopics, actualTopics);
}
+ private void assertRecordMetadataTimestampExists() {
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
+ assertNotNull(recordMetaData1);
+ assertEquals(recordMetaData1.size(), 1, "Expected one recordMetaData");
+ assertNotNull(recordMetaData1.get(0).timestamp());
+ }
+
private void assertRecordMetadataExists() {
List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
assertNotNull(recordMetaData1);