You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/03/15 13:19:24 UTC
[nifi] branch master updated: NIFI-5267 Kafka support to replay by
timestamp
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 45ebeba NIFI-5267 Kafka support to replay by timestamp
45ebeba is described below
commit 45ebeba846cf257c0dd27669c25e244208758fb9
Author: Sandish Kumar <sa...@phdata.io>
AuthorDate: Wed Mar 13 17:35:51 2019 -0500
NIFI-5267 Kafka support to replay by timestamp
This closes #3372.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java | 1 +
.../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++++
.../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java | 1 +
.../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java | 1 +
.../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++++
.../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java | 1 +
.../apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java | 1 +
.../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java | 1 +
.../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++++
.../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java | 1 +
.../apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java | 1 +
.../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java | 1 +
.../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++++
.../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java | 1 +
14 files changed, 26 insertions(+)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 0d80e76..4bc26a1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -63,6 +63,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 26562b9..85c85f6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -442,6 +442,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
// And continue to the next message.
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
@@ -566,6 +567,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
@@ -590,6 +592,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private static class BundleTracker {
final long initialOffset;
+ final long initialTimestamp;
final int partition;
final String topic;
final String key;
@@ -603,6 +606,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
+ this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
+ static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
index 1a627dc..5ac2dfc 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
@@ -64,6 +64,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 815ed18..c61c549 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private static class BundleTracker {
final long initialOffset;
+ final long initialTimestamp;
final int partition;
final String topic;
final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
+ this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index b9a6ae1..37c6599 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
+ static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
index 9a831db..28a582c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -68,6 +68,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
index 0245352..511d85f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
@@ -64,6 +64,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index ac88a4c..f6b0d94 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private static class BundleTracker {
final long initialOffset;
+ final long initialTimestamp;
final int partition;
final String topic;
final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
+ this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
+ static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 2cafd97..fb31e32 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -69,6 +69,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index 758ac87..13bebc9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -66,6 +66,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 5c32c82..2a90219 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private static class BundleTracker {
final long initialOffset;
+ final long initialTimestamp;
final int partition;
final String topic;
final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
+ this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
+ static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");