You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:24 UTC

[flume] 11/14: FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git

commit ac5ed049f96213e9b9b7f1c0a111781c19a411ff
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Fri Oct 7 19:14:26 2022 -0700

    FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp
---
 .../org/apache/flume/sink/kafka/KafkaSink.java     | 64 +++++++++++++++------
 .../flume/sink/kafka/KafkaSinkConstants.java       |  2 +
 .../org/apache/flume/sink/kafka/TestConstants.java |  2 +
 .../org/apache/flume/sink/kafka/TestKafkaSink.java | 65 +++++++++++++++++++++-
 .../org/apache/flume/source/kafka/KafkaSource.java | 16 +++++-
 .../flume/source/kafka/KafkaSourceConstants.java   |  1 +
 .../source/kafka/KafkaSourceEmbeddedKafka.java     | 11 +++-
 .../apache/flume/source/kafka/TestKafkaSource.java | 49 ++++++++++++++++
 8 files changed, 188 insertions(+), 22 deletions(-)

diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index e4c9ff7e2..add6916df 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -44,12 +43,16 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -67,10 +70,12 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
@@ -122,13 +127,11 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
   private Integer staticPartitionId = null;
   private boolean allowTopicOverride;
   private String topicHeader = null;
+  private String timestampHeader = null;
+  private Map<String, String> headerMap;
 
-  private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
-          Optional.absent();
-  private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
-          Optional.absent();
-  private Optional<ByteArrayOutputStream> tempOutStream = Optional
-          .absent();
+  private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
+  private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
 
   //Fine to use null for initial value, Avro will create new ones if this
   // is null
@@ -196,7 +199,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
         if (logger.isTraceEnabled()) {
           if (LogPrivacyUtil.allowLogRawData()) {
             logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
-                + new String(eventBody, "UTF-8"));
+                + new String(eventBody, StandardCharsets.UTF_8));
           } else {
             logger.trace("{Event} " + eventTopic + " : " + eventKey);
           }
@@ -219,12 +222,38 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
               partitionId = Integer.parseInt(headerVal);
             }
           }
+          Long timestamp = null;
+          if (timestampHeader != null) {
+            String value = headers.get(timestampHeader);
+            if (value != null) {
+              try {
+                timestamp = Long.parseLong(value);
+              } catch (Exception ex) {
+                logger.warn("Invalid timestamp in header {} - {}", timestampHeader, value);
+              }
+            }
+          }
+          List<Header> kafkaHeaders = null;
+          if (!headerMap.isEmpty()) {
+            List<Header> tempHeaders = new ArrayList<>();
+            for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+              String value = headers.get(entry.getKey());
+              if (value != null) {
+                tempHeaders.add(new RecordHeader(entry.getValue(),
+                    value.getBytes(StandardCharsets.UTF_8)));
+              }
+            }
+            if (!tempHeaders.isEmpty()) {
+              kafkaHeaders = tempHeaders;
+            }
+          }
+
           if (partitionId != null) {
-            record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
-                serializeEvent(event, useAvroEventFormat));
+            record = new ProducerRecord<>(eventTopic, partitionId, timestamp, eventKey,
+                serializeEvent(event, useAvroEventFormat), kafkaHeaders);
           } else {
-            record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
-                serializeEvent(event, useAvroEventFormat));
+            record = new ProducerRecord<>(eventTopic, null, timestamp, eventKey,
+                serializeEvent(event, useAvroEventFormat), kafkaHeaders);
           }
           kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
         } catch (NumberFormatException ex) {
@@ -247,7 +276,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
         }
         long endTime = System.nanoTime();
         counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
-        counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
+        counter.addToEventDrainSuccessCount(kafkaFutures.size());
       }
 
       transaction.commit();
@@ -256,7 +285,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
       String errorMsg = "Failed to publish events";
       logger.error("Failed to publish events", ex);
       counter.incrementEventWriteOrChannelFail(ex);
-      result = Status.BACKOFF;
       if (transaction != null) {
         try {
           kafkaFutures.clear();
@@ -280,7 +308,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
   @Override
   public synchronized void start() {
     // instantiate the producer
-    producer = new KafkaProducer<String,byte[]>(kafkaProps);
+    producer = new KafkaProducer<>(kafkaProps);
     counter.start();
     super.start();
   }
@@ -305,7 +333,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
    * 3. We add the sink's documented parameters which can override other
    * properties
    *
-   * @param context
+   * @param context The Context.
    */
   @Override
   public void configure(Context context) {
@@ -322,6 +350,10 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu
 
     topic = topicStr;
 
+    timestampHeader = context.getString(TIMESTAMP_HEADER);
+
+    headerMap = context.getSubProperties(KAFKA_HEADER);
+
     batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
 
     if (logger.isDebugEnabled()) {
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index ffca3df72..3f1f279e0 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -35,8 +35,10 @@ public class KafkaSinkConstants {
   public static final String KEY_HEADER = "key";
   public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
   public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
+  public static final String TIMESTAMP_HEADER = "timestampHeader";
   public static final String ALLOW_TOPIC_OVERRIDE_HEADER = "allowTopicOverride";
   public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true;
+  public static final String KAFKA_HEADER = "header.";
 
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 8d6dce74c..672adc900 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -25,4 +25,6 @@ public class TestConstants {
   public static final String CUSTOM_TOPIC = "custom-topic";
   public static final String HEADER_1_VALUE = "test-avro-header";
   public static final String HEADER_1_KEY = "header1";
+  public static final String KAFKA_HEADER_1 = "FLUME_CORRELATOR";
+  public static final String KAFKA_HEADER_2 = "FLUME_METHOD";
 }
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 5a69b16fb..fd5130ca9 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -43,6 +43,9 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -51,6 +54,7 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -185,13 +189,19 @@ public class TestKafkaSink {
   }
 
   private void checkMessageArrived(String msg, String topic) {
-    ConsumerRecords recs = pollConsumerRecords(topic);
+    checkMessageArrived(msg, topic, null, null);
+  }
+
+  private void checkMessageArrived(String msg, String topic, Long timestamp, Headers headers) {
+    ConsumerRecords<String, String> recs = pollConsumerRecords(topic);
     assertNotNull(recs);
     assertTrue(recs.count() > 0);
-    Iterator<ConsumerRecord> iter = recs.records(topic).iterator();
+    Iterator<ConsumerRecord<String, String>> iter = recs.records(topic).iterator();
     boolean match = false;
     while (iter.hasNext()) {
-      if (msg.equals(iter.next().value())) {
+      ConsumerRecord<String, String> record = iter.next();
+      if (msg.equals(record.value()) && (timestamp == null || timestamp.equals(record.timestamp()))
+          && (headers == null || validateHeaders(headers, record.headers()))) {
         match = true;
         break;
       }
@@ -199,6 +209,10 @@ public class TestKafkaSink {
     assertTrue("No message matches " + msg, match);
   }
 
+  private boolean validateHeaders(Headers expected, Headers actual) {
+    return expected.equals(actual);
+  }
+
   @Test
   public void testStaticTopic() {
     Context context = prepareDefaultContext();
@@ -251,6 +265,51 @@ public class TestKafkaSink {
     checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
   }
 
+  @Test
+  public void testTimestampAndHeaders() {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.TIMESTAMP_HEADER, "timestamp");
+    context.put("header.correlator", TestConstants.KAFKA_HEADER_1);
+    context.put("header.method", TestConstants.KAFKA_HEADER_2);
+
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-topic-and-key-from-header";
+    Map<String, String> headers = new HashMap<String, String>();
+    long now = System.currentTimeMillis();
+    headers.put("timestamp", Long.toString(now));
+    headers.put("topic", TestConstants.CUSTOM_TOPIC);
+    headers.put("key", TestConstants.CUSTOM_KEY);
+    headers.put("correlator", "12345");
+    headers.put("method", "testTimestampAndHeaders");
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+    Headers expected = new RecordHeaders();
+    expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_1,
+        "12345".getBytes(StandardCharsets.UTF_8)));
+    expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_2,
+        "testTimestampAndHeaders".getBytes(StandardCharsets.UTF_8)));
+    checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC, now, expected);
+  }
+
   /**
    * Tests that a message will be produced to a topic as specified by a
    * custom topicHeader parameter (FLUME-3046).
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 7a64ed742..2bd19757d 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -48,6 +48,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -89,6 +91,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOP
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER;
@@ -164,6 +167,7 @@ public class KafkaSource extends AbstractPollableSource
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
   private String topicHeader = null;
   private boolean setTopicHeader;
+  private Map<String, String> headerMap;
 
   @Override
   public long getBatchSize() {
@@ -285,7 +289,15 @@ public class KafkaSource extends AbstractPollableSource
 
         // Add headers to event (timestamp, topic, partition, key) only if they don't exist
         if (!headers.containsKey(TIMESTAMP_HEADER)) {
-          headers.put(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
+          headers.put(TIMESTAMP_HEADER, String.valueOf(message.timestamp()));
+        }
+        if (!headerMap.isEmpty()) {
+          Headers kafkaHeaders = message.headers();
+          for (Map.Entry<String, String> entry : headerMap.entrySet()) {
+            for (Header kafkaHeader : kafkaHeaders.headers(entry.getValue())) {
+              headers.put(entry.getKey(), new String(kafkaHeader.value()));
+            }
+          }
         }
         // Only set the topic header if setTopicHeader and it isn't already populated
         if (setTopicHeader && !headers.containsKey(topicHeader)) {
@@ -440,6 +452,8 @@ public class KafkaSource extends AbstractPollableSource
 
     topicHeader = context.getString(TOPIC_HEADER, DEFAULT_TOPIC_HEADER);
 
+    headerMap = context.getSubProperties(KAFKA_HEADER);
+
     setConsumerProps(context);
 
     if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 8ac437add..5c7857d1b 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -36,6 +36,7 @@ public class KafkaSourceConstants {
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int DEFAULT_BATCH_DURATION = 1000;
   public static final String DEFAULT_GROUP_ID = "flume";
+  public static final String KAFKA_HEADER = "header.";
 
   public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
   public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 397af64cf..b2deea9c9 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -142,7 +143,7 @@ public class KafkaSourceEmbeddedKafka {
   }
 
   public void produce(String topic, String k, byte[] v) {
-    ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, k, v);
+    ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, k, v);
     try {
       producer.send(rec).get();
     } catch (InterruptedException e) {
@@ -157,7 +158,13 @@ public class KafkaSourceEmbeddedKafka {
   }
 
   public void produce(String topic, int partition, String k, byte[] v) {
-    ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, partition, k, v);
+    this.produce(topic, partition, null, k, v, null);
+  }
+
+  public void produce(String topic, int partition, Long timestamp, String k, byte[] v,
+      Headers headers) {
+    ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, partition, timestamp, k, v,
+        headers);
     try {
       producer.send(rec).get();
     } catch (InterruptedException e) {
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index ee913c9e6..d5caf7ceb 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -44,6 +44,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Time;
@@ -63,7 +66,10 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -834,6 +840,49 @@ public class TestKafkaSource {
     events.clear();
   }
 
+
+  /**
+   * Tests the availability of the custom topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicKafkaHeaderSet() throws InterruptedException, EventDeliveryException {
+    final String correlatorHeader = "FLUME_CORRELATOR";
+    context.put(TOPICS, topic0);
+    context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
+    context.put(KafkaSourceConstants.KAFKA_HEADER + "correlator", correlatorHeader);
+    context.put(TIMESTAMP_HEADER, "true");
+    kafkaSource.configure(context);
+
+    startKafkaSource();
+
+    Thread.sleep(500L);
+
+    long date = ZonedDateTime.of(2022, 10, 7, 8, 0, 0, 0,
+            ZoneId.systemDefault()).toInstant().toEpochMilli();
+    Headers headers = new RecordHeaders();
+    headers.add(new RecordHeader(correlatorHeader, "12345".getBytes(StandardCharsets.UTF_8)));
+    kafkaServer.produce(topic0, 0, date, "", "hello, world2".getBytes(StandardCharsets.UTF_8),
+        headers);
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world2", new String(events.get(0).getBody(),
+        Charsets.UTF_8));
+    Map<String, String> flumeHeaders = events.get(0).getHeaders();
+    Assert.assertEquals(Long.toString(date), flumeHeaders.get("timestamp"));
+    Assert.assertEquals(topic0, flumeHeaders.get("customTopicHeader"));
+    Assert.assertEquals("12345", flumeHeaders.get("correlator"));
+
+    kafkaSource.stop();
+    events.clear();
+  }
+
+
   /**
    * Tests the unavailability of the topic header in the output events,
    * based on the configuration parameters added in FLUME-3046