You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/10/09 20:17:18 UTC

[pinot] branch master updated: Make StreamMessage generic (#9544)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d69ae1157 Make StreamMessage generic (#9544)
0d69ae1157 is described below

commit 0d69ae11570ef15fa663d750e7954838252a15ec
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Sun Oct 9 13:17:12 2022 -0700

    Make StreamMessage generic (#9544)
---
 .../plugin/stream/kafka20/KafkaMessageBatch.java   |  6 +++---
 .../kafka20/KafkaPartitionLevelConsumer.java       |  6 +++---
 .../plugin/stream/kafka20/KafkaStreamMessage.java  |  2 +-
 .../kafka20/KafkaPartitionLevelConsumerTest.java   | 12 ++++++------
 .../org/apache/pinot/spi/stream/MessageBatch.java  |  4 ++--
 .../pinot/spi/stream/StreamDataDecoderImpl.java    |  4 ++--
 .../org/apache/pinot/spi/stream/StreamMessage.java | 18 ++++++++++++------
 .../spi/stream/StreamDataDecoderImplTest.java      | 11 +++++++----
 .../apache/pinot/spi/stream/StreamMessageTest.java | 22 +++++++++++++++++-----
 9 files changed, 53 insertions(+), 32 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 4fd01562cc..1852c8bdc8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -27,8 +27,8 @@ import org.apache.pinot.spi.stream.StreamMessage;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 
-public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
-  private final List<StreamMessage> _messageList;
+public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> {
+  private final List<StreamMessage<byte[]>> _messageList;
   private final int _unfilteredMessageCount;
   private final long _lastOffset;
 
@@ -37,7 +37,7 @@ public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
    * @param lastOffset the offset of the last message in the batch
    * @param batch the messages, which may be smaller than {@see unfilteredMessageCount}
    */
-  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage> batch) {
+  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch) {
     _messageList = batch;
     _lastOffset = lastOffset;
     _unfilteredMessageCount = unfilteredMessageCount;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 59ee1c0eab..e7d272041c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -46,14 +46,14 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
   }
 
   @Override
-  public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+  public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
       StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch<StreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
+  public MessageBatch<StreamMessage<byte[]>> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
           endOffset, timeoutMillis);
@@ -61,7 +61,7 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
-    List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size());
+    List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size());
     long lastOffset = startOffset;
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
       String key = messageAndOffset.key();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
index 8124bea53d..f67dcf3f7d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
 
 public class KafkaStreamMessage extends StreamMessage {
   public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) {
-    super(key, value, metadata);
+    super(key, value, metadata, value.length);
   }
 
   public long getNextOffset() {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 0a2f7d2f5f..6c85f913f5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -281,7 +281,7 @@ public class KafkaPartitionLevelConsumerTest {
           consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch1.getMessageCount(), 500);
       for (int i = 0; i < batch1.getMessageCount(); i++) {
-        final byte[] msg = batch1.getStreamMessage(i).getValue();
+        final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + i);
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
@@ -290,7 +290,7 @@ public class KafkaPartitionLevelConsumerTest {
           consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch2.getMessageCount(), 500);
       for (int i = 0; i < batch2.getMessageCount(); i++) {
-        final byte[] msg = batch2.getStreamMessage(i).getValue();
+        final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
@@ -298,7 +298,7 @@ public class KafkaPartitionLevelConsumerTest {
       final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000);
       Assert.assertEquals(batch3.getMessageCount(), 25);
       for (int i = 0; i < batch3.getMessageCount(); i++) {
-        final byte[] msg = batch3.getStreamMessage(i).getValue();
+        final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
@@ -388,7 +388,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch1 = consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(400), 10000);
     Assert.assertEquals(batch1.getMessageCount(), 200);
     for (int i = 0; i < batch1.getMessageCount(); i++) {
-      byte[] msg = batch1.getStreamMessage(i).getValue();
+      byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
     }
     Assert.assertEquals(batch1.getOffsetOfNextBatch().toString(), "400");
@@ -400,7 +400,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(201), new LongMsgOffset(401), 10000);
     Assert.assertEquals(batch3.getMessageCount(), 200);
     for (int i = 0; i < batch3.getMessageCount(); i++) {
-      byte[] msg = batch3.getStreamMessage(i).getValue();
+      byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 201));
     }
     Assert.assertEquals(batch3.getOffsetOfNextBatch().toString(), "401");
@@ -408,7 +408,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch4 = consumer.fetchMessages(new LongMsgOffset(0), null, 10000);
     Assert.assertEquals(batch4.getMessageCount(), 500);
     for (int i = 0; i < batch4.getMessageCount(); i++) {
-      byte[] msg = batch4.getStreamMessage(i).getValue();
+      byte[] msg = (byte[]) batch4.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
     }
     Assert.assertEquals(batch4.getOffsetOfNextBatch().toString(), "700");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 0e80df0234..d22a46af67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -55,13 +55,13 @@ public interface MessageBatch<T> {
     return (byte[]) getMessageAtIndex(index);
   }
 
-  default StreamMessage getStreamMessage(int index) {
+  default StreamMessage<T> getStreamMessage(int index) {
     return new LegacyStreamMessage(getMessageBytesAtIndex(index));
   }
 
   class LegacyStreamMessage extends StreamMessage {
     public LegacyStreamMessage(byte[] value) {
-      super(value);
+      super(value, value.length);
     }
   }
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 3e69dbca03..97958b92d3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -44,13 +44,13 @@ public class StreamDataDecoderImpl implements StreamDataDecoder {
 
     try {
       _reuse.clear();
-      GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, _reuse);
+      GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getLength(), _reuse);
       if (row != null) {
         if (message.getKey() != null) {
           row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8));
         }
         RowMetadata metadata = message.getMetadata();
-        if (metadata != null) {
+        if (metadata != null && metadata.getHeaders() != null) {
           metadata.getHeaders().getFieldToValueMap()
               .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
index 6eaf099c12..e626dc8106 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
@@ -38,25 +38,31 @@ import javax.annotation.Nullable;
  * Additionally, the pinot table schema should refer these fields. Otherwise, even though the fields are extracted,
  * they will not materialize in the pinot table.
  */
-public class StreamMessage {
+public class StreamMessage<T> {
   private final byte[] _key;
-  private final byte[] _value;
+  private final T _value;
   protected final StreamMessageMetadata _metadata;
+  int _length = -1;
 
-  public StreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) {
+  public StreamMessage(@Nullable byte[] key, T value, @Nullable StreamMessageMetadata metadata, int length) {
     _key = key;
     _value = value;
     _metadata = metadata;
+    _length = length;
   }
 
-  public StreamMessage(byte[] value) {
-    this(null, value, null);
+  public StreamMessage(T value, int length) {
+    this(null, value, null, length);
   }
 
-  public byte[] getValue() {
+  public T getValue() {
     return _value;
   }
 
+  public int getLength() {
+    return _length;
+  }
+
   @Nullable
   public StreamMessageMetadata getMetadata() {
     return _metadata;
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index 0483b7d1ac..56bfb9b97b 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -40,7 +40,8 @@ public class StreamDataDecoderImplTest {
     TestDecoder messageDecoder = new TestDecoder();
     messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
     String value = "Alice";
-    StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+    StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
+        value.getBytes(StandardCharsets.UTF_8).length);
     StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
     Assert.assertNotNull(result);
     Assert.assertNull(result.getException());
@@ -62,8 +63,9 @@ public class StreamDataDecoderImplTest {
     headers.putValue(AGE_HEADER_KEY, 3);
     Map<String, String> recordMetadata = Collections.singletonMap(SEQNO_RECORD_METADATA, "1");
     StreamMessageMetadata metadata = new StreamMessageMetadata(1234L, headers, recordMetadata);
-    StreamMessage message = new StreamMessage(key.getBytes(StandardCharsets.UTF_8),
-        value.getBytes(StandardCharsets.UTF_8), metadata);
+    byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+    StreamMessage<byte[]> message =
+        new StreamMessage(key.getBytes(StandardCharsets.UTF_8), valueBytes, metadata, value.length());
 
     StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
     Assert.assertNotNull(result);
@@ -84,7 +86,8 @@ public class StreamDataDecoderImplTest {
     ThrowingDecoder messageDecoder = new ThrowingDecoder();
     messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
     String value = "Alice";
-    StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+    StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
+        value.getBytes(StandardCharsets.UTF_8).length);
     StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
     Assert.assertNotNull(result);
     Assert.assertNotNull(result.getException());
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
index 926761109b..10e5087493 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
@@ -27,17 +27,29 @@ public class StreamMessageTest {
 
   @Test
   public void testAllowNullKeyAndMetadata() {
-    StreamMessage msg = new StreamMessage("hello".getBytes(StandardCharsets.UTF_8));
+    String value = "hello";
+    byte[] valBytes = value.getBytes(StandardCharsets.UTF_8);
+    StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length);
     Assert.assertNull(msg.getKey());
     Assert.assertNull(msg.getMetadata());
-    Assert.assertEquals(new String(msg.getValue()), "hello");
+    Assert.assertEquals(new String(msg.getValue()), value);
 
-    StreamMessage msg1 = new StreamMessage("key".getBytes(StandardCharsets.UTF_8),
-        "value".getBytes(StandardCharsets.UTF_8), null);
+    value = "value";
+    valBytes = value.getBytes(StandardCharsets.UTF_8);
+    StreamMessage<byte[]> msg1 =
+        new StreamMessage("key".getBytes(StandardCharsets.UTF_8), valBytes, null, valBytes.length);
     Assert.assertNotNull(msg1.getKey());
     Assert.assertEquals(new String(msg1.getKey()), "key");
     Assert.assertNotNull(msg1.getValue());
-    Assert.assertEquals(new String(msg1.getValue()), "value");
+    Assert.assertEquals(new String(msg1.getValue()), value);
     Assert.assertNull(msg1.getMetadata());
+
+    StreamMessage<String> msg2 = new StreamMessage<>("key".getBytes(StandardCharsets.UTF_8), value, null,
+        value.length());
+    Assert.assertNotNull(msg2.getKey());
+    Assert.assertEquals(new String(msg2.getKey()), "key");
+    Assert.assertNotNull(msg2.getValue());
+    Assert.assertEquals(msg2.getValue(), value);
+    Assert.assertNull(msg2.getMetadata());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org