You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2022/07/23 18:47:48 UTC

[pinot] branch message-header created (now f89cc3b53c)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch message-header
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at f89cc3b53c Adding support to extract values from message header.. initial support for kafka headers

This branch includes the following new commits:

     new f89cc3b53c Adding support to extract values from message header.. initial support for kafka headers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Adding support to extract values from message header.. initial support for kafka headers

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch message-header
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit f89cc3b53c52b2481575f92c1245aab844e6ea22
Author: kishoreg <g....@gmail.com>
AuthorDate: Sat Jul 23 11:47:30 2022 -0700

    Adding support to extract values from message header.. initial support for kafka headers
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  8 +++++++-
 .../kafka20/KafkaPartitionLevelConsumer.java       | 13 +++++++++++-
 .../stream/kafka20/RowMetadataExtractor.java       | 23 +++++++++++++++++++++-
 .../org/apache/pinot/spi/stream/RowMetadata.java   |  6 ++++++
 .../pinot/spi/stream/StreamMessageMetadata.java    | 14 +++++++++++++
 5 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fdfe31377e..1de9c818ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -527,8 +527,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
 
       GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
+          .decode(messagesAndOffsets.getMessageAtIndex(index),
+              messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
+      if (msgMetadata.getHeaders() != null) {
+        for (Map.Entry<String, Object> entrySet : msgMetadata.getHeaders().getFieldToValueMap().entrySet()) {
+          decodedRow.putValue(entrySet.getKey(), entrySet.getValue());
+        }
+      }
       if (decodedRow != null) {
         try {
           _transformPipeline.processRow(decodedRow, reusedResult);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index bf212cd855..31292cc09c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -23,11 +23,16 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +43,8 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
+  GenericRow _headers = new GenericRow();
+
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
@@ -65,8 +72,12 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
       long offset = messageAndOffset.offset();
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
+          RowMetadata rowMetadata = null;
+          if (_config.isPopulateMetadata()) {
+            rowMetadata = _rowMetadataExtractor.extract(messageAndOffset);
+          }
           filtered.add(
-              new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset)));
+              new MessageAndOffsetAndMetadata(message.get(), offset, rowMetadata));
         } else if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("tombstone message at offset {}", offset);
         }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
index 81402bb7d5..e36bb331f4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
@@ -19,6 +19,9 @@
 package org.apache.pinot.plugin.stream.kafka20;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 
@@ -26,7 +29,25 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
 @FunctionalInterface
 public interface RowMetadataExtractor {
   static RowMetadataExtractor build(boolean populateMetadata) {
-    return populateMetadata ? record -> new StreamMessageMetadata(record.timestamp()) : record -> null;
+    return record -> {
+      if (populateMetadata) {
+        return null;
+      } else {
+        StreamMessageMetadata streamMessageMetadata = new StreamMessageMetadata(record.timestamp());
+        Headers headers = record.headers();
+        if (headers != null) {
+          GenericRow headerGenericRow = new GenericRow();
+          if (headers != null) {
+            Header[] headersArray = headers.toArray();
+            for (Header header : headersArray) {
+              headerGenericRow.putValue(header.key(), header.value());
+            }
+          }
+          streamMessageMetadata.setHeaders(headerGenericRow);
+        }
+        return streamMessageMetadata;
+      }
+    };
   }
 
   RowMetadata extract(ConsumerRecord<?, ?> consumerRecord);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index a8b0f2116f..fb19701d02 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
 
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
@@ -40,4 +41,9 @@ public interface RowMetadata {
    *         Long.MIN_VALUE if not available
    */
   long getIngestionTimeMs();
+
+  default GenericRow getHeaders(){
+    return null;
+  }
+
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 9991f34eac..d32deda9df 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.spi.stream;
 
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
 /**
  * A class that provides metadata associated with the message of a stream, for e.g.,
  * ingestion-timestamp of the message.
@@ -26,6 +29,8 @@ public class StreamMessageMetadata implements RowMetadata {
 
   private final long _ingestionTimeMs;
 
+  GenericRow _headers;
+
   /**
    * Construct the stream based message/row message metadata
    *
@@ -40,4 +45,13 @@ public class StreamMessageMetadata implements RowMetadata {
   public long getIngestionTimeMs() {
     return _ingestionTimeMs;
   }
+
+  @Override
+  public GenericRow getHeaders() {
+    return _headers;
+  }
+
+  public void setHeaders(GenericRow headers) {
+    _headers = headers;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org