You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 02:14:33 UTC
[flume] branch trunk updated: FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9036db906 FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp
9036db906 is described below
commit 9036db906d6fee66feee058f13d45f427b2dbd4c
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Fri Oct 7 19:14:26 2022 -0700
FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp
---
.../org/apache/flume/sink/kafka/KafkaSink.java | 64 +++++++++++++++------
.../flume/sink/kafka/KafkaSinkConstants.java | 2 +
.../org/apache/flume/sink/kafka/TestConstants.java | 2 +
.../org/apache/flume/sink/kafka/TestKafkaSink.java | 65 +++++++++++++++++++++-
.../org/apache/flume/source/kafka/KafkaSource.java | 16 +++++-
.../flume/source/kafka/KafkaSourceConstants.java | 1 +
.../source/kafka/KafkaSourceEmbeddedKafka.java | 11 +++-
.../apache/flume/source/kafka/TestKafkaSource.java | 49 ++++++++++++++++
8 files changed, 188 insertions(+), 22 deletions(-)
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index e4c9ff7e2..add6916df 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -44,12 +43,16 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -67,10 +70,12 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
@@ -122,13 +127,11 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
private Integer staticPartitionId = null;
private boolean allowTopicOverride;
private String topicHeader = null;
+ private String timestampHeader = null;
+ private Map<String, String> headerMap;
- private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
- Optional.absent();
- private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
- Optional.absent();
- private Optional<ByteArrayOutputStream> tempOutStream = Optional
- .absent();
+ private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
+ private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
//Fine to use null for initial value, Avro will create new ones if this
// is null
@@ -196,7 +199,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
if (logger.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
- + new String(eventBody, "UTF-8"));
+ + new String(eventBody, StandardCharsets.UTF_8));
} else {
logger.trace("{Event} " + eventTopic + " : " + eventKey);
}
@@ -219,12 +222,38 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
partitionId = Integer.parseInt(headerVal);
}
}
+ Long timestamp = null;
+ if (timestampHeader != null) {
+ String value = headers.get(timestampHeader);
+ if (value != null) {
+ try {
+ timestamp = Long.parseLong(value);
+ } catch (Exception ex) {
+ logger.warn("Invalid timestamp in header {} - {}", timestampHeader, value);
+ }
+ }
+ }
+ List<Header> kafkaHeaders = null;
+ if (!headerMap.isEmpty()) {
+ List<Header> tempHeaders = new ArrayList<>();
+ for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+ String value = headers.get(entry.getKey());
+ if (value != null) {
+ tempHeaders.add(new RecordHeader(entry.getValue(),
+ value.getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+ if (!tempHeaders.isEmpty()) {
+ kafkaHeaders = tempHeaders;
+ }
+ }
+
if (partitionId != null) {
- record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
- serializeEvent(event, useAvroEventFormat));
+ record = new ProducerRecord<>(eventTopic, partitionId, timestamp, eventKey,
+ serializeEvent(event, useAvroEventFormat), kafkaHeaders);
} else {
- record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
- serializeEvent(event, useAvroEventFormat));
+ record = new ProducerRecord<>(eventTopic, null, timestamp, eventKey,
+ serializeEvent(event, useAvroEventFormat), kafkaHeaders);
}
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
} catch (NumberFormatException ex) {
@@ -247,7 +276,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
}
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
- counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
+ counter.addToEventDrainSuccessCount(kafkaFutures.size());
}
transaction.commit();
@@ -256,7 +285,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
counter.incrementEventWriteOrChannelFail(ex);
- result = Status.BACKOFF;
if (transaction != null) {
try {
kafkaFutures.clear();
@@ -280,7 +308,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
@Override
public synchronized void start() {
// instantiate the producer
- producer = new KafkaProducer<String,byte[]>(kafkaProps);
+ producer = new KafkaProducer<>(kafkaProps);
counter.start();
super.start();
}
@@ -305,7 +333,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
* 3. We add the sink's documented parameters which can override other
* properties
*
- * @param context
+ * @param context The Context.
*/
@Override
public void configure(Context context) {
@@ -322,6 +350,10 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
topic = topicStr;
+ timestampHeader = context.getString(TIMESTAMP_HEADER);
+
+ headerMap = context.getSubProperties(KAFKA_HEADER);
+
batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
if (logger.isDebugEnabled()) {
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index ffca3df72..3f1f279e0 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -35,8 +35,10 @@ public class KafkaSinkConstants {
public static final String KEY_HEADER = "key";
public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
+ public static final String TIMESTAMP_HEADER = "timestampHeader";
public static final String ALLOW_TOPIC_OVERRIDE_HEADER = "allowTopicOverride";
public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true;
+ public static final String KAFKA_HEADER = "header.";
public static final String AVRO_EVENT = "useFlumeEventFormat";
public static final boolean DEFAULT_AVRO_EVENT = false;
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 8d6dce74c..672adc900 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -25,4 +25,6 @@ public class TestConstants {
public static final String CUSTOM_TOPIC = "custom-topic";
public static final String HEADER_1_VALUE = "test-avro-header";
public static final String HEADER_1_KEY = "header1";
+ public static final String KAFKA_HEADER_1 = "FLUME_CORRELATOR";
+ public static final String KAFKA_HEADER_2 = "FLUME_METHOD";
}
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 5a69b16fb..fd5130ca9 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -43,6 +43,9 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -51,6 +54,7 @@ import org.mockito.internal.util.reflection.Whitebox;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -185,13 +189,19 @@ public class TestKafkaSink {
}
private void checkMessageArrived(String msg, String topic) {
- ConsumerRecords recs = pollConsumerRecords(topic);
+ checkMessageArrived(msg, topic, null, null);
+ }
+
+ private void checkMessageArrived(String msg, String topic, Long timestamp, Headers headers) {
+ ConsumerRecords<String, String> recs = pollConsumerRecords(topic);
assertNotNull(recs);
assertTrue(recs.count() > 0);
- Iterator<ConsumerRecord> iter = recs.records(topic).iterator();
+ Iterator<ConsumerRecord<String, String>> iter = recs.records(topic).iterator();
boolean match = false;
while (iter.hasNext()) {
- if (msg.equals(iter.next().value())) {
+ ConsumerRecord<String, String> record = iter.next();
+ if (msg.equals(record.value()) && (timestamp == null || timestamp.equals(record.timestamp()))
+ && (headers == null || validateHeaders(headers, record.headers()))) {
match = true;
break;
}
@@ -199,6 +209,10 @@ public class TestKafkaSink {
assertTrue("No message matches " + msg, match);
}
+ private boolean validateHeaders(Headers expected, Headers actual) {
+ return expected.equals(actual);
+ }
+
@Test
public void testStaticTopic() {
Context context = prepareDefaultContext();
@@ -251,6 +265,51 @@ public class TestKafkaSink {
checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
}
+ @Test
+ public void testTimestampAndHeaders() {
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ context.put(KafkaSinkConstants.TIMESTAMP_HEADER, "timestamp");
+ context.put("header.correlator", TestConstants.KAFKA_HEADER_1);
+ context.put("header.method", TestConstants.KAFKA_HEADER_2);
+
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "test-topic-and-key-from-header";
+ Map<String, String> headers = new HashMap<String, String>();
+ long now = System.currentTimeMillis();
+ headers.put("timestamp", Long.toString(now));
+ headers.put("topic", TestConstants.CUSTOM_TOPIC);
+ headers.put("key", TestConstants.CUSTOM_KEY);
+ headers.put("correlator", "12345");
+ headers.put("method", "testTimestampAndHeaders");
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes(), headers);
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+ Headers expected = new RecordHeaders();
+ expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_1,
+ "12345".getBytes(StandardCharsets.UTF_8)));
+ expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_2,
+ "testTimestampAndHeaders".getBytes(StandardCharsets.UTF_8)));
+ checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC, now, expected);
+ }
+
/**
* Tests that a message will be produced to a topic as specified by a
* custom topicHeader parameter (FLUME-3046).
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 7a64ed742..2bd19757d 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -48,6 +48,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -89,6 +91,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOP
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
@@ -164,6 +167,7 @@ public class KafkaSource extends AbstractPollableSource
private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
private String topicHeader = null;
private boolean setTopicHeader;
+ private Map<String, String> headerMap;
@Override
public long getBatchSize() {
@@ -285,7 +289,15 @@ public class KafkaSource extends AbstractPollableSource
// Add headers to event (timestamp, topic, partition, key) only if they don't exist
if (!headers.containsKey(TIMESTAMP_HEADER)) {
- headers.put(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
+ headers.put(TIMESTAMP_HEADER, String.valueOf(message.timestamp()));
+ }
+ if (!headerMap.isEmpty()) {
+ Headers kafkaHeaders = message.headers();
+ for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+ for (Header kafkaHeader : kafkaHeaders.headers(entry.getValue())) {
+ headers.put(entry.getKey(), new String(kafkaHeader.value()));
+ }
+ }
}
// Only set the topic header if setTopicHeader and it isn't already populated
if (setTopicHeader && !headers.containsKey(topicHeader)) {
@@ -440,6 +452,8 @@ public class KafkaSource extends AbstractPollableSource
topicHeader = context.getString(TOPIC_HEADER, DEFAULT_TOPIC_HEADER);
+ headerMap = context.getSubProperties(KAFKA_HEADER);
+
setConsumerProps(context);
if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 8ac437add..5c7857d1b 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -36,6 +36,7 @@ public class KafkaSourceConstants {
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final int DEFAULT_BATCH_DURATION = 1000;
public static final String DEFAULT_GROUP_ID = "flume";
+ public static final String KAFKA_HEADER = "header.";
public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 397af64cf..b2deea9c9 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -142,7 +143,7 @@ public class KafkaSourceEmbeddedKafka {
}
public void produce(String topic, String k, byte[] v) {
- ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, k, v);
+ ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, k, v);
try {
producer.send(rec).get();
} catch (InterruptedException e) {
@@ -157,7 +158,13 @@ public class KafkaSourceEmbeddedKafka {
}
public void produce(String topic, int partition, String k, byte[] v) {
- ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, partition, k, v);
+ this.produce(topic, partition, null, k, v, null);
+ }
+
+ public void produce(String topic, int partition, Long timestamp, String k, byte[] v,
+ Headers headers) {
+ ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, partition, timestamp, k, v,
+ headers);
try {
producer.send(rec).get();
} catch (InterruptedException e) {
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index ee913c9e6..d5caf7ceb 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -44,6 +44,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
@@ -63,7 +66,10 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -834,6 +840,49 @@ public class TestKafkaSource {
events.clear();
}
+
+ /**
+ * Tests the availability of the custom topic header in the output events,
+ * based on the configuration parameters added in FLUME-3046
+ * @throws InterruptedException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testTopicKafkaHeaderSet() throws InterruptedException, EventDeliveryException {
+ final String correlatorHeader = "FLUME_CORRELATOR";
+ context.put(TOPICS, topic0);
+ context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
+ context.put(KafkaSourceConstants.KAFKA_HEADER + "correlator", correlatorHeader);
+ context.put(TIMESTAMP_HEADER, "true");
+ kafkaSource.configure(context);
+
+ startKafkaSource();
+
+ Thread.sleep(500L);
+
+ long date = ZonedDateTime.of(2022, 10, 7, 8, 0, 0, 0,
+ ZoneId.systemDefault()).toInstant().toEpochMilli();
+ Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader(correlatorHeader, "12345".getBytes(StandardCharsets.UTF_8)));
+ kafkaServer.produce(topic0, 0, date, "", "hello, world2".getBytes(StandardCharsets.UTF_8),
+ headers);
+
+ Thread.sleep(500L);
+
+ Status status = kafkaSource.process();
+ assertEquals(Status.READY, status);
+ Assert.assertEquals("hello, world2", new String(events.get(0).getBody(),
+ Charsets.UTF_8));
+ Map<String, String> flumeHeaders = events.get(0).getHeaders();
+ Assert.assertEquals(Long.toString(date), flumeHeaders.get("timestamp"));
+ Assert.assertEquals(topic0, flumeHeaders.get("customTopicHeader"));
+ Assert.assertEquals("12345", flumeHeaders.get("correlator"));
+
+ kafkaSource.stop();
+ events.clear();
+ }
+
+
/**
* Tests the unavailability of the topic header in the output events,
* based on the configuration parameters added in FLUME-3046