You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/29 16:12:50 UTC

[pinot] branch master updated: Message batch ingestion lag fix (#10983)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9374964ad4 Message batch ingestion lag fix (#10983)
9374964ad4 is described below

commit 9374964ad4b5797329333157fcbb1de56cc66657
Author: summerhasama-stripe <13...@users.noreply.github.com>
AuthorDate: Thu Jun 29 12:12:42 2023 -0400

    Message batch ingestion lag fix (#10983)
    
    * add back stream message constructur with 1 arg
    * fix bug with metadata being set to null in code trying to assure backwards incompatability
    * Assert not null on stream message metadata
    
    ---------
    
    Co-authored-by: Priyen Patel <pr...@stripe.com>
    Co-authored-by: Rong Rong <ro...@apache.org>
---
 .../stream/kafka20/KafkaPartitionLevelConsumerTest.java     | 13 ++++++++++---
 .../main/java/org/apache/pinot/spi/stream/MessageBatch.java |  6 +++---
 .../java/org/apache/pinot/spi/stream/StreamMessage.java     |  6 +++++-
 .../org/apache/pinot/spi/stream/StreamMessageMetadata.java  |  4 ++++
 .../apache/pinot/spi/stream/StreamDataDecoderImplTest.java  |  4 ++--
 .../java/org/apache/pinot/spi/stream/StreamMessageTest.java |  2 +-
 6 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6c85f913f5..e783e091ae 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMessage;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -281,7 +282,9 @@ public class KafkaPartitionLevelConsumerTest {
           consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch1.getMessageCount(), 500);
       for (int i = 0; i < batch1.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
+        StreamMessage streamMessage = batch1.getStreamMessage(i);
+        Assert.assertNotNull(streamMessage.getMetadata());
+        final byte[] msg = (byte[]) streamMessage.getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + i);
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
@@ -290,7 +293,9 @@ public class KafkaPartitionLevelConsumerTest {
           consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch2.getMessageCount(), 500);
       for (int i = 0; i < batch2.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue();
+        StreamMessage streamMessage = batch2.getStreamMessage(i);
+        Assert.assertNotNull(streamMessage.getMetadata());
+        final byte[] msg = (byte[]) streamMessage.getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
@@ -298,7 +303,9 @@ public class KafkaPartitionLevelConsumerTest {
       final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000);
       Assert.assertEquals(batch3.getMessageCount(), 25);
       for (int i = 0; i < batch3.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
+        StreamMessage streamMessage = batch3.getStreamMessage(i);
+        Assert.assertNotNull(streamMessage.getMetadata());
+        final byte[] msg = (byte[]) streamMessage.getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
         Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 7ae4226e47..8bde04aed4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -57,12 +57,12 @@ public interface MessageBatch<T> {
   }
 
   default StreamMessage<T> getStreamMessage(int index) {
-    return new LegacyStreamMessage(getMessageBytesAtIndex(index));
+    return new LegacyStreamMessage(getMessageBytesAtIndex(index), (StreamMessageMetadata) getMetadataAtIndex(index));
   }
 
   class LegacyStreamMessage extends StreamMessage {
-    public LegacyStreamMessage(byte[] value) {
-      super(value, value.length);
+    public LegacyStreamMessage(byte[] value, StreamMessageMetadata metadata) {
+      super(value, value.length, metadata);
     }
   }
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
index e626dc8106..17f66b6a5a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
@@ -52,7 +52,11 @@ public class StreamMessage<T> {
   }
 
   public StreamMessage(T value, int length) {
-    this(null, value, null, length);
+    this(value, length, null);
+  }
+
+  public StreamMessage(T value, int length, @Nullable StreamMessageMetadata metadata) {
+    this(null, value, metadata, length);
   }
 
   public T getValue() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index ac67249441..557069a581 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -34,6 +34,10 @@ public class StreamMessageMetadata implements RowMetadata {
   private final GenericRow _headers;
   private final Map<String, String> _metadata;
 
+  public StreamMessageMetadata(long recordIngestionTimeMs) {
+    this(recordIngestionTimeMs, Long.MIN_VALUE, null, Collections.emptyMap());
+  }
+
   public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) {
     this(recordIngestionTimeMs, Long.MIN_VALUE, headers, Collections.emptyMap());
   }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index 56bfb9b97b..9aaf4bc386 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -41,7 +41,7 @@ public class StreamDataDecoderImplTest {
     messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
     String value = "Alice";
     StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
-        value.getBytes(StandardCharsets.UTF_8).length);
+        value.getBytes(StandardCharsets.UTF_8).length, null);
     StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
     Assert.assertNotNull(result);
     Assert.assertNull(result.getException());
@@ -87,7 +87,7 @@ public class StreamDataDecoderImplTest {
     messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
     String value = "Alice";
     StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
-        value.getBytes(StandardCharsets.UTF_8).length);
+        value.getBytes(StandardCharsets.UTF_8).length, null);
     StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
     Assert.assertNotNull(result);
     Assert.assertNotNull(result.getException());
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
index 10e5087493..d0d028709f 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
@@ -29,7 +29,7 @@ public class StreamMessageTest {
   public void testAllowNullKeyAndMetadata() {
     String value = "hello";
     byte[] valBytes = value.getBytes(StandardCharsets.UTF_8);
-    StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length);
+    StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length, null);
     Assert.assertNull(msg.getKey());
     Assert.assertNull(msg.getMetadata());
     Assert.assertEquals(new String(msg.getValue()), value);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org