You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/10/09 20:17:18 UTC
[pinot] branch master updated: Make StreamMessage generic (#9544)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0d69ae1157 Make StreamMessage generic (#9544)
0d69ae1157 is described below
commit 0d69ae11570ef15fa663d750e7954838252a15ec
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Sun Oct 9 13:17:12 2022 -0700
Make StreamMessage generic (#9544)
---
.../plugin/stream/kafka20/KafkaMessageBatch.java | 6 +++---
.../kafka20/KafkaPartitionLevelConsumer.java | 6 +++---
.../plugin/stream/kafka20/KafkaStreamMessage.java | 2 +-
.../kafka20/KafkaPartitionLevelConsumerTest.java | 12 ++++++------
.../org/apache/pinot/spi/stream/MessageBatch.java | 4 ++--
.../pinot/spi/stream/StreamDataDecoderImpl.java | 4 ++--
.../org/apache/pinot/spi/stream/StreamMessage.java | 18 ++++++++++++------
.../spi/stream/StreamDataDecoderImplTest.java | 11 +++++++----
.../apache/pinot/spi/stream/StreamMessageTest.java | 22 +++++++++++++++++-----
9 files changed, 53 insertions(+), 32 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 4fd01562cc..1852c8bdc8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -27,8 +27,8 @@ import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
- private final List<StreamMessage> _messageList;
+public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> {
+ private final List<StreamMessage<byte[]>> _messageList;
private final int _unfilteredMessageCount;
private final long _lastOffset;
@@ -37,7 +37,7 @@ public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see unfilteredMessageCount}
*/
- public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage> batch) {
+ public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch) {
_messageList = batch;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 59ee1c0eab..e7d272041c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -46,14 +46,14 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
}
@Override
- public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+ public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
return fetchMessages(startOffset, endOffset, timeoutMillis);
}
- public MessageBatch<StreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
+ public MessageBatch<StreamMessage<byte[]>> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
@@ -61,7 +61,7 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
- List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size());
+ List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size());
long lastOffset = startOffset;
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
String key = messageAndOffset.key();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
index 8124bea53d..f67dcf3f7d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
public class KafkaStreamMessage extends StreamMessage {
public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) {
- super(key, value, metadata);
+ super(key, value, metadata, value.length);
}
public long getNextOffset() {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 0a2f7d2f5f..6c85f913f5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -281,7 +281,7 @@ public class KafkaPartitionLevelConsumerTest {
consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
Assert.assertEquals(batch1.getMessageCount(), 500);
for (int i = 0; i < batch1.getMessageCount(); i++) {
- final byte[] msg = batch1.getStreamMessage(i).getValue();
+ final byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
@@ -290,7 +290,7 @@ public class KafkaPartitionLevelConsumerTest {
consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
Assert.assertEquals(batch2.getMessageCount(), 500);
for (int i = 0; i < batch2.getMessageCount(); i++) {
- final byte[] msg = batch2.getStreamMessage(i).getValue();
+ final byte[] msg = (byte[]) batch2.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
@@ -298,7 +298,7 @@ public class KafkaPartitionLevelConsumerTest {
final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000);
Assert.assertEquals(batch3.getMessageCount(), 25);
for (int i = 0; i < batch3.getMessageCount(); i++) {
- final byte[] msg = batch3.getStreamMessage(i).getValue();
+ final byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
Assert.assertNotNull(batch1.getMetadataAtIndex(i));
}
@@ -388,7 +388,7 @@ public class KafkaPartitionLevelConsumerTest {
MessageBatch batch1 = consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(400), 10000);
Assert.assertEquals(batch1.getMessageCount(), 200);
for (int i = 0; i < batch1.getMessageCount(); i++) {
- byte[] msg = batch1.getStreamMessage(i).getValue();
+ byte[] msg = (byte[]) batch1.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
}
Assert.assertEquals(batch1.getOffsetOfNextBatch().toString(), "400");
@@ -400,7 +400,7 @@ public class KafkaPartitionLevelConsumerTest {
MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(201), new LongMsgOffset(401), 10000);
Assert.assertEquals(batch3.getMessageCount(), 200);
for (int i = 0; i < batch3.getMessageCount(); i++) {
- byte[] msg = batch3.getStreamMessage(i).getValue();
+ byte[] msg = (byte[]) batch3.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (i + 201));
}
Assert.assertEquals(batch3.getOffsetOfNextBatch().toString(), "401");
@@ -408,7 +408,7 @@ public class KafkaPartitionLevelConsumerTest {
MessageBatch batch4 = consumer.fetchMessages(new LongMsgOffset(0), null, 10000);
Assert.assertEquals(batch4.getMessageCount(), 500);
for (int i = 0; i < batch4.getMessageCount(); i++) {
- byte[] msg = batch4.getStreamMessage(i).getValue();
+ byte[] msg = (byte[]) batch4.getStreamMessage(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
}
Assert.assertEquals(batch4.getOffsetOfNextBatch().toString(), "700");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 0e80df0234..d22a46af67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -55,13 +55,13 @@ public interface MessageBatch<T> {
return (byte[]) getMessageAtIndex(index);
}
- default StreamMessage getStreamMessage(int index) {
+ default StreamMessage<T> getStreamMessage(int index) {
return new LegacyStreamMessage(getMessageBytesAtIndex(index));
}
class LegacyStreamMessage extends StreamMessage {
public LegacyStreamMessage(byte[] value) {
- super(value);
+ super(value, value.length);
}
}
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 3e69dbca03..97958b92d3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -44,13 +44,13 @@ public class StreamDataDecoderImpl implements StreamDataDecoder {
try {
_reuse.clear();
- GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, _reuse);
+ GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getLength(), _reuse);
if (row != null) {
if (message.getKey() != null) {
row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8));
}
RowMetadata metadata = message.getMetadata();
- if (metadata != null) {
+ if (metadata != null && metadata.getHeaders() != null) {
metadata.getHeaders().getFieldToValueMap()
.forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
index 6eaf099c12..e626dc8106 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
@@ -38,25 +38,31 @@ import javax.annotation.Nullable;
* Additionally, the pinot table schema should refer these fields. Otherwise, even though the fields are extracted,
* they will not materialize in the pinot table.
*/
-public class StreamMessage {
+public class StreamMessage<T> {
private final byte[] _key;
- private final byte[] _value;
+ private final T _value;
protected final StreamMessageMetadata _metadata;
+ int _length = -1;
- public StreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) {
+ public StreamMessage(@Nullable byte[] key, T value, @Nullable StreamMessageMetadata metadata, int length) {
_key = key;
_value = value;
_metadata = metadata;
+ _length = length;
}
- public StreamMessage(byte[] value) {
- this(null, value, null);
+ public StreamMessage(T value, int length) {
+ this(null, value, null, length);
}
- public byte[] getValue() {
+ public T getValue() {
return _value;
}
+ public int getLength() {
+ return _length;
+ }
+
@Nullable
public StreamMessageMetadata getMetadata() {
return _metadata;
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index 0483b7d1ac..56bfb9b97b 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -40,7 +40,8 @@ public class StreamDataDecoderImplTest {
TestDecoder messageDecoder = new TestDecoder();
messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
String value = "Alice";
- StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+ StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
+ value.getBytes(StandardCharsets.UTF_8).length);
StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
Assert.assertNotNull(result);
Assert.assertNull(result.getException());
@@ -62,8 +63,9 @@ public class StreamDataDecoderImplTest {
headers.putValue(AGE_HEADER_KEY, 3);
Map<String, String> recordMetadata = Collections.singletonMap(SEQNO_RECORD_METADATA, "1");
StreamMessageMetadata metadata = new StreamMessageMetadata(1234L, headers, recordMetadata);
- StreamMessage message = new StreamMessage(key.getBytes(StandardCharsets.UTF_8),
- value.getBytes(StandardCharsets.UTF_8), metadata);
+ byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+ StreamMessage<byte[]> message =
+ new StreamMessage(key.getBytes(StandardCharsets.UTF_8), valueBytes, metadata, value.length());
StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
Assert.assertNotNull(result);
@@ -84,7 +86,8 @@ public class StreamDataDecoderImplTest {
ThrowingDecoder messageDecoder = new ThrowingDecoder();
messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), "");
String value = "Alice";
- StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+ StreamMessage<byte[]> message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8),
+ value.getBytes(StandardCharsets.UTF_8).length);
StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message);
Assert.assertNotNull(result);
Assert.assertNotNull(result.getException());
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
index 926761109b..10e5087493 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
@@ -27,17 +27,29 @@ public class StreamMessageTest {
@Test
public void testAllowNullKeyAndMetadata() {
- StreamMessage msg = new StreamMessage("hello".getBytes(StandardCharsets.UTF_8));
+ String value = "hello";
+ byte[] valBytes = value.getBytes(StandardCharsets.UTF_8);
+ StreamMessage<byte[]> msg = new StreamMessage(valBytes, valBytes.length);
Assert.assertNull(msg.getKey());
Assert.assertNull(msg.getMetadata());
- Assert.assertEquals(new String(msg.getValue()), "hello");
+ Assert.assertEquals(new String(msg.getValue()), value);
- StreamMessage msg1 = new StreamMessage("key".getBytes(StandardCharsets.UTF_8),
- "value".getBytes(StandardCharsets.UTF_8), null);
+ value = "value";
+ valBytes = value.getBytes(StandardCharsets.UTF_8);
+ StreamMessage<byte[]> msg1 =
+ new StreamMessage("key".getBytes(StandardCharsets.UTF_8), valBytes, null, valBytes.length);
Assert.assertNotNull(msg1.getKey());
Assert.assertEquals(new String(msg1.getKey()), "key");
Assert.assertNotNull(msg1.getValue());
- Assert.assertEquals(new String(msg1.getValue()), "value");
+ Assert.assertEquals(new String(msg1.getValue()), value);
Assert.assertNull(msg1.getMetadata());
+
+ StreamMessage<String> msg2 = new StreamMessage<>("key".getBytes(StandardCharsets.UTF_8), value, null,
+ value.length());
+ Assert.assertNotNull(msg2.getKey());
+ Assert.assertEquals(new String(msg2.getKey()), "key");
+ Assert.assertNotNull(msg2.getValue());
+ Assert.assertEquals(msg2.getValue(), value);
+ Assert.assertNull(msg2.getMetadata());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org