You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/29 16:12:50 UTC
[pinot] branch master updated: Message batch ingestion lag fix (#10983)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9374964ad4 Message batch ingestion lag fix (#10983)
9374964ad4 is described below
commit 9374964ad4b5797329333157fcbb1de56cc66657
Author: summerhasama-stripe <13...@users.noreply.github.com>
AuthorDate: Thu Jun 29 12:12:42 2023 -0400
Message batch ingestion lag fix (#10983)
* add back stream message constructur with 1 arg
* fix bug with metadata being set to null in code trying to assure backwards incompatability
* Assert not null on stream message metadata
---------
Co-authored-by: Priyen Patel <pr...@stripe.com>
Co-authored-by: Rong Rong <ro...@apache.org>
---
.../stream/kafka20/KafkaPartitionLevelConsumerTest.java | 13 ++++++++++---
.../main/java/org/apache/pinot/spi/stream/MessageBatch.java | 6 +++---
.../java/org/apache/pinot/spi/stream/StreamMessage.java | 6 +++++-
.../org/apache/pinot/spi/stream/StreamMessageMetadata.java | 4 ++++
.../apache/pinot/spi/stream/StreamDataDecoderImplTest.java | 4 ++--
.../java/org/apache/pinot/spi/stream/StreamMessageTest.java | 2 +-
6 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6c85f913f5..e783e091ae 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMessage;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -281,7 +282,9 @@ public class KafkaPartitionLevelConsumerTest {
consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
Assert.assertEquals(batch1.getMessageCount(), 500);
for (int i = 0; i < batch1.getMessageCount(); i++) {
- final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
+ StreamMessage streamMessage = batch1.getStreamMessage(i);
+ Assert.assertNotNull(streamMessage.getMetadata());
+ final byte[] msg = (byte[]) streamMessage.getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
@@ -290,7 +293,9 @@ public class KafkaPartitionLevelConsumerTest {
consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
Assert.assertEquals(batch2.getMessageCount(), 500);
for (int i = 0; i < batch2.getMessageCount(); i++) {
- final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue();
+ StreamMessage streamMessage = batch2.getStreamMessage(i);
+ Assert.assertNotNull(streamMessage.getMetadata());
+ final byte[] msg = (byte[]) streamMessage.getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
@@ -298,7 +303,9 @@ public class KafkaPartitionLevelConsumerTest {
final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000);
Assert.assertEquals(batch3.getMessageCount(), 25);
for (int i = 0; i < batch3.getMessageCount(); i++) {
- final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
+ StreamMessage streamMessage = batch3.getStreamMessage(i);
+ Assert.assertNotNull(streamMessage.getMetadata());
+ final byte[] msg = (byte[]) streamMessage.getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 7ae4226e47..8bde04aed4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -57,12 +57,12 @@ public interface MessageBatch<T> {
}
default StreamMessage<T> getStreamMessage(int index) {
- return new LegacyStreamMessage(getMessageBytesAtIndex(index));
+ return new LegacyStreamMessage(getMessageBytesAtIndex(index), (StreamMessageMetadata) getMetadataAtIndex(index));
}
class LegacyStreamMessage extends StreamMessage {
- public LegacyStreamMessage(byte[] value) {
- super(value, value.length);
+ public LegacyStreamMessage(byte[] value, StreamMessageMetadata metadata) {
+ super(value, value.length, metadata);
}
}
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
index e626dc8106..17f66b6a5a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
@@ -52,7 +52,11 @@ public class StreamMessage<T> {
}
public StreamMessage(T value, int length) {
- this(null, value, null, length);
+ this(value, length, null);
+ }
+
+ public StreamMessage(T value, int length, @Nullable StreamMessageMetadata metadata) {
+ this(null, value, metadata, length);
}
public T getValue() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index ac67249441..557069a581 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -34,6 +34,10 @@ public class StreamMessageMetadata implements RowMetadata {
private final GenericRow _headers;
private final Map<String, String> _metadata;
+ public StreamMessageMetadata(long recordIngestionTimeMs) {
+ this(recordIngestionTimeMs, Long.MIN_VALUE, null, Collections.emptyMap());
+ }
+
public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) {
this(recordIngestionTimeMs, Long.MIN_VALUE, headers, Collections.emptyMap());
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index 56bfb9b97b..9aaf4bc386 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -41,7 +41,7 @@ public class StreamDataDecoderImplTest {
messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
String value = "Alice";
StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
- value.getBytes(StandardCharsets.UTF_8).length);
+ value.getBytes(StandardCharsets.UTF_8).length, null);
StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
Assert.assertNotNull(result);
Assert.assertNull(result.getException());
@@ -87,7 +87,7 @@ public class StreamDataDecoderImplTest {
messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
String value = "Alice";
StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
- value.getBytes(StandardCharsets.UTF_8).length);
+ value.getBytes(StandardCharsets.UTF_8).length, null);
StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
Assert.assertNotNull(result);
Assert.assertNotNull(result.getException());
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
index 10e5087493..d0d028709f 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
@@ -29,7 +29,7 @@ public class StreamMessageTest {
public void testAllowNullKeyAndMetadata() {
String value = "hello";
byte[] valBytes = value.getBytes(StandardCharsets.UTF_8);
- StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length);
+ StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length, null);
Assert.assertNull(msg.getKey());
Assert.assertNull(msg.getMetadata());
Assert.assertEquals(new String(msg.getValue()), value);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org