You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/22 00:44:39 UTC

[pinot] branch master updated: Extract record keys, headers and metadata from Pulsar sources (#10995)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe2b013a65 Extract record keys, headers and metadata from Pulsar sources (#10995)
fe2b013a65 is described below

commit fe2b013a657e1ad6ac508a9a37933961bc4c408b
Author: Jeff Bolle <Je...@users.noreply.github.com>
AuthorDate: Fri Jul 21 20:44:33 2023 -0400

    Extract record keys, headers and metadata from Pulsar sources (#10995)
---
 .../pinot/plugin/stream/pulsar/PulsarConfig.java   |  43 ++++-
 .../plugin/stream/pulsar/PulsarMessageBatch.java   |  51 +-----
 .../stream/pulsar/PulsarMetadataExtractor.java     | 182 +++++++++++++++++++++
 .../PulsarPartitionLevelConnectionHandler.java     |   3 +-
 .../pulsar/PulsarPartitionLevelConsumer.java       |  26 +--
 .../stream/pulsar/PulsarStreamLevelConsumer.java   |   4 +-
 .../plugin/stream/pulsar/PulsarStreamMessage.java  |  47 ++++++
 .../stream/pulsar/PulsarStreamMessageMetadata.java |  76 +++++++++
 .../pinot/plugin/stream/pulsar/PulsarUtils.java    |  40 +++++
 .../plugin/stream/pulsar/PulsarConfigTest.java     | 118 +++++++++++++
 .../plugin/stream/pulsar/PulsarConsumerTest.java   |  34 ++--
 .../stream/pulsar/PulsarMessageBatchTest.java      |  36 ++--
 .../stream/pulsar/PulsarMetadataExtractorTest.java |  92 +++++++++++
 .../pinot/spi/stream/StreamDataDecoderImpl.java    |   9 +-
 14 files changed, 669 insertions(+), 92 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 73ea2eca4b..4094a93f06 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -19,7 +19,13 @@
 package org.apache.pinot.plugin.stream.pulsar;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -37,6 +43,7 @@ public class PulsarConfig {
   public static final String AUTHENTICATION_TOKEN = "authenticationToken";
   public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
   public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
+  public static final String METADATA_FIELDS = "metadataFields"; //list of the metadata fields comma separated
 
   private final String _pulsarTopicName;
   private final String _subscriberId;
@@ -45,8 +52,10 @@ public class PulsarConfig {
   private final SubscriptionInitialPosition _subscriptionInitialPosition;
   private final String _authenticationToken;
   private final String _tlsTrustCertsFilePath;
+  @Deprecated(since = "v0.13.* since pulsar supports record key extraction")
   private final boolean _enableKeyValueStitch;
-
+  private final boolean _populateMetadata;
+  private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> _metadataFields;
   public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
     _pulsarTopicName = streamConfig.getTopicName();
@@ -71,8 +80,32 @@ public class PulsarConfig {
 
     _subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
     _initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
+    _populateMetadata = Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+        "false"));
+    String metadataFieldsToExtractCSV = streamConfig.getStreamConfigsMap().getOrDefault(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, METADATA_FIELDS), "");
+    if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata) {
+      _metadataFields = Collections.emptySet();
+    } else {
+      _metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV);
+    }
   }
 
+  private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> parseConfigStringToEnumSet(
+      String listOfMetadataFields) {
+    try {
+      String[] metadataFieldsArr = listOfMetadataFields.split(",");
+      return Stream.of(metadataFieldsArr)
+          .map(String::trim)
+          .filter(StringUtils::isNotBlank)
+          .map(PulsarStreamMessageMetadata.PulsarMessageMetadataValue::findByKey)
+          .filter(Objects::nonNull)
+          .collect(Collectors.toSet());
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Invalid metadata fields list: " + listOfMetadataFields, e);
+    }
+  }
   public String getPulsarTopicName() {
     return _pulsarTopicName;
   }
@@ -100,8 +133,14 @@ public class PulsarConfig {
   public String getTlsTrustCertsFilePath() {
     return _tlsTrustCertsFilePath;
   }
-
   public boolean getEnableKeyValueStitch() {
     return _enableKeyValueStitch;
   }
+  public boolean isPopulateMetadata() {
+    return _populateMetadata;
+  }
+
+  public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> getMetadataFields() {
+    return _metadataFields;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
index 6df313b722..912e8bef23 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
@@ -21,16 +21,12 @@ package org.apache.pinot.plugin.stream.pulsar;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -39,13 +35,11 @@ import org.slf4j.LoggerFactory;
  * plugins will not work. A custom decoder will be needed to unpack key and value byte arrays and decode
  * them independently.
  */
-public class PulsarMessageBatch implements MessageBatch<byte[]> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageBatch.class);
-  private final List<Message<byte[]>> _messageList = new ArrayList<>();
-  private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+public class PulsarMessageBatch implements MessageBatch<PulsarStreamMessage> {
+  private final List<PulsarStreamMessage> _messageList = new ArrayList<>();
   private final boolean _enableKeyValueStitch;
 
-  public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean enableKeyValueStitch) {
+  public PulsarMessageBatch(Iterable<PulsarStreamMessage> iterable, boolean enableKeyValueStitch) {
     iterable.forEach(_messageList::add);
     _enableKeyValueStitch = enableKeyValueStitch;
   }
@@ -56,26 +50,19 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> {
   }
 
   @Override
-  public byte[] getMessageAtIndex(int index) {
-    Message<byte[]> msg = _messageList.get(index);
-    if (_enableKeyValueStitch) {
-      return stitchKeyValue(msg.getKeyBytes(), msg.getData());
-    }
-    return msg.getData();
+  public PulsarStreamMessage getMessageAtIndex(int index) {
+    return _messageList.get(index);
   }
 
   @Override
   public int getMessageOffsetAtIndex(int index) {
-    return ByteBuffer.wrap(_messageList.get(index).getData()).arrayOffset();
+    return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
   }
 
   @Override
   public int getMessageLengthAtIndex(int index) {
-    if (_enableKeyValueStitch) {
-      Message<byte[]> msg = _messageList.get(index);
-      return 8 + msg.getKeyBytes().length + msg.getData().length;
-    }
-    return _messageList.get(index).getData().length;
+    return _messageList.get(index).getValue().length; //if _enableKeyValueStitch is true,
+    // then they are already stitched in the consumer. If false, then the value is the raw value
   }
 
   /**
@@ -123,26 +110,4 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> {
   public long getNextStreamMessageOffsetAtIndex(int index) {
     throw new UnsupportedOperationException("Pulsar does not support long stream offsets");
   }
-
-  /**
-   * Stitch key and value bytes together using a simple format:
-   * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
-   */
-  private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
-    int keyLen = keyBytes.length;
-    int valueLen = valueBytes.length;
-    int totalByteArrayLength = 8 + keyLen + valueLen;
-    try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) {
-      LENGTH_BUF.clear();
-      bos.write(LENGTH_BUF.putInt(keyLen).array());
-      bos.write(keyBytes);
-      LENGTH_BUF.clear();
-      bos.write(LENGTH_BUF.putInt(valueLen).array());
-      bos.write(valueBytes);
-      return bos.toByteArray();
-    } catch (Exception e) {
-      LOGGER.error("Unable to stitch key and value bytes together", e);
-    }
-    return null;
-  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
new file mode 100644
index 0000000000..c33c32e7cb
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarMetadataExtractor {
+  static PulsarMetadataExtractor build(boolean populateMetadata,
+      Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) {
+    return message -> {
+      long publishTime = message.getPublishTime();
+      long brokerPublishTime = message.getBrokerPublishTime().orElse(0L);
+      long recordTimestamp = brokerPublishTime != 0 ? brokerPublishTime : publishTime;
+
+      Map<String, String> metadataMap = populateMetadataMap(populateMetadata, message, metadataValuesToExtract);
+
+      GenericRow headerGenericRow = populateMetadata ? buildGenericRow(message) : null;
+      return new PulsarStreamMessageMetadata(recordTimestamp, headerGenericRow, metadataMap);
+    };
+  }
+
+  RowMetadata extract(Message<?> record);
+
+  static GenericRow buildGenericRow(Message<?> message) {
+    if (MapUtils.isEmpty(message.getProperties())) {
+      return null;
+    }
+    GenericRow genericRow = new GenericRow();
+    for (Map.Entry<String, String> entry : message.getProperties().entrySet()) {
+      genericRow.putValue(entry.getKey(), entry.getValue());
+    }
+    return genericRow;
+  }
+
+  static Map<String, String> populateMetadataMap(boolean populateAllFields, Message<?> message,
+      Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) {
+
+    Map<String, String> metadataMap = new HashMap<>();
+    populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, message, metadataMap);
+    populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, message, metadataMap);
+    populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, message,
+        metadataMap);
+    populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, message, metadataMap);
+
+    // Populate some timestamps for lag calculation even if populateMetadata is false
+
+    if (!populateAllFields) {
+      return metadataMap;
+    }
+
+    for (PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue : metadataValuesToExtract) {
+      populateMetadataField(metadataValue, message, metadataMap);
+    }
+
+    return metadataMap;
+  }
+
+  private static void populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue value,
+      Message<?> message, Map<String, String> metadataMap) {
+    switch (value) {
+      case PUBLISH_TIME:
+        long publishTime = message.getPublishTime();
+        if (publishTime > 0) {
+          setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+              publishTime);
+        }
+        break;
+      case EVENT_TIME:
+        long eventTime = message.getEventTime();
+        if (eventTime > 0) {
+          setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+              eventTime);
+        }
+        break;
+      case BROKER_PUBLISH_TIME:
+        message.getBrokerPublishTime()
+            .ifPresent(brokerPublishTime -> setMetadataMapField(metadataMap,
+                PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, brokerPublishTime));
+        break;
+      case MESSAGE_KEY:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+            message.getKey());
+        break;
+      case MESSAGE_ID:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+            message.getMessageId().toString());
+        break;
+      case MESSAGE_ID_BYTES_B64:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+            message.getMessageId().toByteArray());
+        break;
+      case PRODUCER_NAME:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PRODUCER_NAME,
+            message.getProducerName());
+        break;
+      case SCHEMA_VERSION:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SCHEMA_VERSION,
+            message.getSchemaVersion());
+        break;
+      case SEQUENCE_ID:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SEQUENCE_ID,
+            message.getSequenceId());
+        break;
+      case ORDERING_KEY:
+        if (message.hasOrderingKey()) {
+          setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.ORDERING_KEY,
+              message.getOrderingKey());
+        }
+        break;
+      case SIZE:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SIZE,
+            message.size());
+        break;
+      case TOPIC_NAME:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME,
+            message.getTopicName());
+        break;
+      case INDEX:
+        message.getIndex().ifPresent(index -> setMetadataMapField(metadataMap,
+            PulsarStreamMessageMetadata.PulsarMessageMetadataValue.INDEX, index));
+        break;
+      case REDELIVERY_COUNT:
+        setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.REDELIVERY_COUNT,
+            message.getRedeliveryCount());
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported metadata value: " + value);
+    }
+  }
+
+  private static void setMetadataMapField(Map<String, String> metadataMap,
+      PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+      String value) {
+    if (StringUtils.isNotBlank(value)) {
+      metadataMap.put(metadataValue.getKey(), value);
+    }
+  }
+
+  private static void setMetadataMapField(Map<String, String> metadataMap,
+      PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+      int value) {
+    setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+  }
+
+  private static void setMetadataMapField(Map<String, String> metadataMap,
+      PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+      long value) {
+    setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+  }
+
+  private static void setMetadataMapField(Map<String, String> metadataMap,
+      PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+      byte[] value) {
+    if (value != null && value.length > 0) {
+      setMetadataMapField(metadataMap, metadataValue, Base64.getEncoder().encodeToString(value));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 3ad57b55bb..11033ec716 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -40,6 +40,7 @@ public class PulsarPartitionLevelConnectionHandler {
   protected final PulsarConfig _config;
   protected final String _clientId;
   protected PulsarClient _pulsarClient = null;
+  protected final PulsarMetadataExtractor _pulsarMetadataExtractor;
 
   /**
    * Creates a new instance of {@link PulsarClient} and {@link Reader}
@@ -47,7 +48,7 @@ public class PulsarPartitionLevelConnectionHandler {
   public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig) {
     _config = new PulsarConfig(streamConfig, clientId);
     _clientId = clientId;
-
+    _pulsarMetadataExtractor = PulsarMetadataExtractor.build(_config.isPopulateMetadata(), _config.getMetadataFields());
     try {
       ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
       if (_config.getTlsTrustCertsFilePath() != null) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
index 0e55a07aa3..d1b80b0360 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -48,15 +47,15 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
   private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
   private final ExecutorService _executorService;
   private final Reader _reader;
-  private boolean _enableKeyValueStitch = false;
+  private boolean _enableKeyValueStitch;
 
   public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
     super(clientId, streamConfig);
     PulsarConfig config = new PulsarConfig(streamConfig, clientId);
-    _reader = createReaderForPartition(config.getPulsarTopicName(),
-        partitionGroupConsumptionStatus.getPartitionGroupId(),
-        config.getInitialMessageId());
+    _reader =
+        createReaderForPartition(config.getPulsarTopicName(), partitionGroupConsumptionStatus.getPartitionGroupId(),
+            config.getInitialMessageId());
     LOGGER.info("Created pulsar reader with id {} for topic {} partition {}", _reader, _config.getPulsarTopicName(),
         partitionGroupConsumptionStatus.getPartitionGroupId());
     _executorService = Executors.newSingleThreadExecutor();
@@ -64,19 +63,19 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
   }
 
   /**
-   * Fetch records from the Pulsar stream between the start and end KinesisCheckpoint
+   * Fetch records from the Pulsar stream between the start and end StreamPartitionMsgOffset
    * Used {@link org.apache.pulsar.client.api.Reader} to read the messaged from pulsar partitioned topic
    * The reader seeks to the startMsgOffset and starts reading records in a loop until endMsgOffset or timeout is
    * reached.
    */
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis) {
+  public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId();
     final MessageId endMessageId =
         endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
 
-    List<Message<byte[]>> messagesList = new ArrayList<>();
+    List<PulsarStreamMessage> messagesList = new ArrayList<>();
     Future<PulsarMessageBatch> pulsarResultFuture =
         _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
 
@@ -96,7 +95,7 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
   }
 
   public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId,
-      List<Message<byte[]>> messagesList) {
+      List<PulsarStreamMessage> messagesList) {
     try {
       _reader.seek(startMessageId);
 
@@ -108,7 +107,8 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
             break;
           }
         }
-        messagesList.add(nextMessage);
+        messagesList.add(
+            PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor));
 
         if (Thread.interrupted()) {
           break;
@@ -124,11 +124,11 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
     }
   }
 
-  private Iterable<Message<byte[]>> buildOffsetFilteringIterable(final List<Message<byte[]>> messageAndOffsets,
+  private Iterable<PulsarStreamMessage> buildOffsetFilteringIterable(final List<PulsarStreamMessage> messageAndOffsets,
       final MessageId startOffset, final MessageId endOffset) {
     return Iterables.filter(messageAndOffsets, input -> {
       // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.getData() != null && (input.getMessageId().compareTo(startOffset) >= 0) && (
+      return input != null && input.getValue() != null && (input.getMessageId().compareTo(startOffset) >= 0) && (
           (endOffset == null) || (input.getMessageId().compareTo(endOffset) < 0));
     });
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
index 60272c6212..82040f6de3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
@@ -81,9 +81,9 @@ public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
         // Log every minute or 100k events
         if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 100000) {
           if (_lastCount == 0) {
-            _logger.info("Consumed {} events from kafka stream {}", _currentCount, _streamConfig.getTopicName());
+            _logger.info("Consumed {} events from pulsar stream {}", _currentCount, _streamConfig.getTopicName());
           } else {
-            _logger.info("Consumed {} events from kafka stream {} (rate:{}/s)", _currentCount - _lastCount,
+            _logger.info("Consumed {} events from pulsar stream {} (rate:{}/s)", _currentCount - _lastCount,
                 _streamConfig.getTopicName(), (float) (_currentCount - _lastCount) * 1000 / (now - _lastLogTime));
           }
           _lastCount = _currentCount;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
new file mode 100644
index 0000000000..7e09197857
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarStreamMessage extends StreamMessage<byte[]> {
+
+    private final MessageId _messageId;
+    public PulsarStreamMessage(@Nullable byte[] key, byte[] value, MessageId messageId,
+                                @Nullable PulsarStreamMessageMetadata metadata, int length) {
+        super(key, value, metadata, length);
+        _messageId = messageId;
+    }
+
+    public MessageId getMessageId() {
+        return _messageId;
+    }
+
+    int getKeyLength() {
+        byte[] key = getKey();
+        return key == null ? 0 : key.length;
+    }
+
+    int getValueLength() {
+        byte[] value = getValue();
+        return value == null ? 0 : value.length;
+    }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
new file mode 100644
index 0000000000..59220138d7
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.EnumSet;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+
+/**
+ * Pulsar specific implementation of {@link StreamMessageMetadata}
+ * Pulsar makes many metadata values available for each message. Please see the pulsar documentation for more details.
+ * @see <a href="https://pulsar.apache.org/docs/en/concepts-messaging/#message-properties">Pulsar Message Properties</a>
+ */
+public class PulsarStreamMessageMetadata extends StreamMessageMetadata {
+
+    public enum PulsarMessageMetadataValue {
+        PUBLISH_TIME("publishTime"),
+        EVENT_TIME("eventTime"),
+        BROKER_PUBLISH_TIME("brokerPublishTime"),
+        MESSAGE_KEY("key"),
+        MESSAGE_ID("messageId"),
+        MESSAGE_ID_BYTES_B64("messageIdBytes"),
+        PRODUCER_NAME("producerName"),
+        SCHEMA_VERSION("schemaVersion"),
+        SEQUENCE_ID("sequenceId"),
+        ORDERING_KEY("orderingKey"),
+        SIZE("size"),
+        TOPIC_NAME("topicName"),
+        INDEX("index"),
+        REDELIVERY_COUNT("redeliveryCount");
+
+        private final String _key;
+
+        PulsarMessageMetadataValue(String key) {
+            _key = key;
+        }
+
+        public String getKey() {
+            return _key;
+        }
+
+        public static PulsarMessageMetadataValue findByKey(final String key) {
+            EnumSet<PulsarMessageMetadataValue> values = EnumSet.allOf(PulsarMessageMetadataValue.class);
+            return values.stream().filter(value -> value.getKey().equals(key)).findFirst().orElse(null);
+        }
+    }
+
+    public PulsarStreamMessageMetadata(long recordIngestionTimeMs,
+                                        @Nullable GenericRow headers) {
+        super(recordIngestionTimeMs, headers);
+    }
+
+    public PulsarStreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers,
+                                        Map<String, String> metadata) {
+        super(recordIngestionTimeMs, headers, metadata);
+    }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index 763b0fc0d4..d22f8b0b5f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -18,13 +18,22 @@
  */
 package org.apache.pinot.plugin.stream.pulsar;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class PulsarUtils {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PulsarUtils.class);
+
+  private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+
   private PulsarUtils() {
   }
 
@@ -51,4 +60,35 @@ public class PulsarUtils {
 
     throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
   }
+
+  /**
+   * Stitch key and value bytes together using a simple format:
+   * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
+   */
+  protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+    int keyLen = keyBytes.length;
+    int valueLen = valueBytes.length;
+    int totalByteArrayLength = 8 + keyLen + valueLen;
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) {
+      LENGTH_BUF.clear();
+      bos.write(LENGTH_BUF.putInt(keyLen).array());
+      bos.write(keyBytes);
+      LENGTH_BUF.clear();
+      bos.write(LENGTH_BUF.putInt(valueLen).array());
+      bos.write(valueBytes);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      LOGGER.error("Unable to stitch key and value bytes together", e);
+    }
+    return null;
+  }
+
+  protected static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
+      PulsarMetadataExtractor pulsarMetadataExtractor) {
+    byte[] key = message.getKeyBytes();
+    byte[] data = enableKeyValueStitch ? stitchKeyValue(key, message.getData()) : message.getData();
+    int dataLength = (data != null) ? data.length : 0;
+    return new PulsarStreamMessage(key, data, message.getMessageId(),
+        (PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), dataLength);
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
new file mode 100644
index 0000000000..ad23c83ce0
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PulsarConfigTest {
+  public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
+
+  public static final String STREAM_TYPE = "pulsar";
+  public static final String STREAM_PULSAR_BROKER_LIST = "pulsar://localhost:6650";
+  public static final String STREAM_PULSAR_CONSUMER_TYPE = "simple";
+  Map<String, String> getCommonStreamConfigMap() {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", STREAM_TYPE);
+    streamConfigMap.put("stream.pulsar.consumer.type", STREAM_PULSAR_CONSUMER_TYPE);
+    streamConfigMap.put("stream.pulsar.topic.name", "test-topic");
+    streamConfigMap.put("stream.pulsar.bootstrap.servers", STREAM_PULSAR_BROKER_LIST);
+    streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
+    streamConfigMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName());
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
+        "1000");
+    streamConfigMap.put("stream.pulsar.decoder.class.name", "decoderClass");
+    return streamConfigMap;
+  }
+
+  @Test
+  public void testParsingMetadataConfigWithConfiguredFields() throws Exception {
+    Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+        "true");
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS),
+        "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+    StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+    PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+    Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+        pulsarConfig.getMetadataFields();
+    Assert.assertEquals(metadataFieldsToExtract.size(), 6);
+    Assert.assertTrue(metadataFieldsToExtract.containsAll(List.of(
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+        PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME)));
+  }
+
+  @Test
+  public void testParsingMetadataConfigWithoutConfiguredFields() throws Exception {
+    Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+        "true");
+    StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+    PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+    Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+        pulsarConfig.getMetadataFields();
+    Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+  }
+
+  @Test
+  public void testParsingNoMetadataConfig() throws Exception {
+    Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+        "false");
+    StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+    PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+    Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+    Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+        pulsarConfig.getMetadataFields();
+    Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+  }
+
+  @Test
+  public void testParsingNoMetadataConfigWithConfiguredFields() throws Exception {
+    Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+        "false");
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS),
+        "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+    StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+    PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+    Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+        pulsarConfig.getMetadataFields();
+    Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+    Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 4779e6afcd..9d59f82fcc 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -281,34 +281,35 @@ public class PulsarConsumerTest {
 
       int totalMessagesReceived = 0;
 
-      final PartitionGroupConsumer consumer =
-          streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
-      final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
+      final PulsarPartitionLevelConsumer consumer =
+              (PulsarPartitionLevelConsumer) streamConsumerFactory
+                      .createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
+      final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
           new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch1.getMessageCount(), 500);
       for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+        final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + i);
         totalMessagesReceived++;
       }
 
-      final MessageBatch messageBatch2 =
+      final PulsarMessageBatch messageBatch2 =
           consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null,
               CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch2.getMessageCount(), 500);
       for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+        final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
         totalMessagesReceived++;
       }
 
-      final MessageBatch messageBatch3 =
+      final PulsarMessageBatch messageBatch3 =
           consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)),
               new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)),
               CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch3.getMessageCount(), 25);
       for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+        final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
       }
 
@@ -333,36 +334,37 @@ public class PulsarConsumerTest {
 
       int totalMessagesReceived = 0;
 
-      final PartitionGroupConsumer consumer =
-          streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
+      final PulsarPartitionLevelConsumer consumer =
+              (PulsarPartitionLevelConsumer) streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID,
+                  partitionGroupConsumptionStatus);
       //TODO: This test failed, check it out.
-      final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
+      final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
           new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)),
           CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch1.getMessageCount(), 500);
       for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+        final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + i);
         totalMessagesReceived++;
       }
 
-      final MessageBatch messageBatch2 =
+      final PulsarMessageBatch messageBatch2 =
           consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null,
               CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch2.getMessageCount(), 500);
       for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+        final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
         totalMessagesReceived++;
       }
 
-      final MessageBatch messageBatch3 =
+      final PulsarMessageBatch messageBatch3 =
           consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)),
               new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)),
               CONSUMER_FETCH_TIMEOUT_MILLIS);
       Assert.assertEquals(messageBatch3.getMessageCount(), 25);
       for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+        final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
       }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
index 7cc0a99a6c..904dd33a04 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
@@ -20,10 +20,13 @@ package org.apache.pinot.plugin.stream.pulsar;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
@@ -37,30 +40,33 @@ public class PulsarMessageBatchTest {
   private DummyPulsarMessage _msgWithKeyAndValue;
   private byte[] _expectedValueBytes;
   private byte[] _expectedKeyBytes;
-  private List<Message<byte[]>> _messageList;
+  private List<DummyPulsarMessage> _messageList;
+  private PulsarMetadataExtractor _metadataExtractor;
 
-  class DummyPulsarMessage implements Message<byte[]> {
+  public static class DummyPulsarMessage implements Message<byte[]> {
     private final byte[] _keyData;
     private final byte[] _valueData;
+    private Map<String, String> _properties;
 
     public DummyPulsarMessage(byte[] key, byte[] value) {
       _keyData = key;
       _valueData = value;
+      _properties = new HashMap<>();
     }
 
     @Override
     public Map<String, String> getProperties() {
-      return null;
+      return _properties;
     }
 
     @Override
     public boolean hasProperty(String name) {
-      return false;
+      return _properties.containsKey(name);
     }
 
     @Override
     public String getProperty(String name) {
-      return null;
+      return _properties.get(name);
     }
 
     @Override
@@ -80,7 +86,7 @@ public class PulsarMessageBatchTest {
 
     @Override
     public MessageId getMessageId() {
-      return null;
+      return MessageId.earliest;
     }
 
     @Override
@@ -110,7 +116,7 @@ public class PulsarMessageBatchTest {
 
     @Override
     public String getKey() {
-      return _keyData.toString();
+      return new String(_keyData);
     }
 
     @Override
@@ -196,20 +202,28 @@ public class PulsarMessageBatchTest {
     _random.nextBytes(_expectedKeyBytes);
     _msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes, _expectedValueBytes);
     _messageList = new ArrayList<>();
+    _metadataExtractor = PulsarMetadataExtractor.build(true,
+        EnumSet.allOf(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.class));
     _messageList.add(_msgWithKeyAndValue);
   }
 
   @Test
   public void testMessageBatchNoStitching() {
-    PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, false);
-    byte[] valueBytes = messageBatch.getMessageAtIndex(0);
+    List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message ->
+            PulsarUtils.buildPulsarStreamMessage(message, false, _metadataExtractor))
+            .collect(Collectors.toList());
+    PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, false);
+    byte[] valueBytes = messageBatch.getMessageAtIndex(0).getValue();
     Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
   }
 
   @Test
   public void testMessageBatchWithStitching() {
-    PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, true);
-    byte[] keyValueBytes = messageBatch.getMessageAtIndex(0);
+    List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message ->
+                    PulsarUtils.buildPulsarStreamMessage(message, true, _metadataExtractor))
+            .collect(Collectors.toList());
+    PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, true);
+    byte[] keyValueBytes = messageBatch.getMessageAtIndex(0).getValue();
     Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length + _expectedValueBytes.length);
     try {
       ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
new file mode 100644
index 0000000000..4c8e28021a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import org.apache.pulsar.client.api.MessageId;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.plugin.stream.pulsar.PulsarMessageBatchTest.DummyPulsarMessage;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY;
+import static org.testng.Assert.assertEquals;
+
+
+public class PulsarMetadataExtractorTest {
+
+  private PulsarMetadataExtractor _metadataExtractor;
+
+  @BeforeClass
+  public void setup() {
+    _metadataExtractor = PulsarMetadataExtractor.build(true, Set.of(MESSAGE_ID, MESSAGE_ID_BYTES_B64, MESSAGE_KEY));
+  }
+
+  @Test
+  public void testExtractProperty()
+      throws Exception {
+    DummyPulsarMessage pulsarMessage =
+        new DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8));
+    pulsarMessage.getProperties().put("test_key", "test_value");
+    pulsarMessage.getProperties().put("test_key2", "2");
+    PulsarStreamMessageMetadata metadata = (PulsarStreamMessageMetadata) _metadataExtractor.extract(pulsarMessage);
+    assertEquals("test_value", metadata.getHeaders().getValue("test_key"));
+    assertEquals("2", metadata.getHeaders().getValue("test_key2"));
+    assertEquals("key", metadata.getRecordMetadata().get(MESSAGE_KEY.getKey()));
+    String messageIdStr = metadata.getRecordMetadata().get(MESSAGE_ID.getKey());
+    assertEquals(pulsarMessage.getMessageId().toString(), messageIdStr);
+
+    byte[] messageIdBytes = Base64.decode(metadata.getRecordMetadata().get(MESSAGE_ID_BYTES_B64.getKey()));
+    MessageId messageId = MessageId.fromByteArray(messageIdBytes);
+    assertEquals(MessageId.earliest, messageId);
+  }
+
+  @Test
+  public void testPulsarSteamMessageUnstitched() {
+    String key = "key";
+    String value = "value";
+    DummyPulsarMessage dummyPulsarMessage =
+        new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+    PulsarStreamMessage streamMessage =
+        PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, false, _metadataExtractor);
+    assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+    assertEquals(value.getBytes(StandardCharsets.UTF_8), streamMessage.getValue());
+    assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength());
+    assertEquals(value.getBytes(StandardCharsets.UTF_8).length, streamMessage.getValueLength());
+  }
+
+  @Test
+  public void testPulsarSteamMessageStitched() {
+    String key = "key";
+    String value = "value";
+    byte[] stitchedValueBytes =
+        PulsarUtils.stitchKeyValue(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+    DummyPulsarMessage dummyPulsarMessage =
+        new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+    PulsarStreamMessage streamMessage =
+        PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, true, _metadataExtractor);
+    assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+    assertEquals(stitchedValueBytes, streamMessage.getValue());
+    assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength());
+    assertEquals(stitchedValueBytes.length, streamMessage.getValueLength());
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 97958b92d3..b570067a69 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -50,10 +50,11 @@ public class StreamDataDecoderImpl implements StreamDataDecoder {
           row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8));
         }
         RowMetadata metadata = message.getMetadata();
-        if (metadata != null && metadata.getHeaders() != null) {
-          metadata.getHeaders().getFieldToValueMap()
-              .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
-
+        if (metadata != null) {
+          if (metadata.getHeaders() != null) {
+            metadata.getHeaders().getFieldToValueMap()
+                    .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
+          }
           metadata.getRecordMetadata()
                   .forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org