You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:22 UTC
[incubator-pinot] 32/47: Return message batch instead of list in
the fetch result
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit c5c42d497320a3e7aedca4a7e1c43808e69222f9
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 31 11:24:42 2020 +0530
Return message batch instead of list in the fetch result
---
.../plugin/stream/kinesis/KinesisFetchResult.java | 7 +--
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 52 ++++++++++++++++++++++
.../plugin/stream/kinesis/KinesisConsumerTest.java | 7 +--
.../apache/pinot/spi/stream/v2/FetchResult.java | 3 +-
4 files changed, 62 insertions(+), 7 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index aedcd5d..39561f3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.FetchResult;
import software.amazon.awssdk.services.kinesis.model.Record;
-public class KinesisFetchResult implements FetchResult<Record> {
+public class KinesisFetchResult implements FetchResult<byte[]> {
private final KinesisCheckpoint _kinesisCheckpoint;
private final List<Record> _recordList;
@@ -40,7 +41,7 @@ public class KinesisFetchResult implements FetchResult<Record> {
}
@Override
- public List<Record> getMessages() {
- return _recordList;
+ public KinesisRecordsBatch getMessages() {
+ return new KinesisRecordsBatch(_recordList);
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
new file mode 100644
index 0000000..ed51f8f
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -0,0 +1,52 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisRecordsBatch implements MessageBatch<byte[]> {
+ private List<Record> _recordList;
+
+ public KinesisRecordsBatch(List<Record> recordList) {
+ _recordList = recordList;
+ }
+
+ @Override
+ public int getMessageCount() {
+ return _recordList.size();
+ }
+
+ @Override
+ public byte[] getMessageAtIndex(int index) {
+ return _recordList.get(index).data().asByteArray();
+ }
+
+ @Override
+ public int getMessageOffsetAtIndex(int index) {
+ //TODO: Doesn't translate to offset. Needs to be replaced.
+ return _recordList.get(index).hashCode();
+ }
+
+ @Override
+ public int getMessageLengthAtIndex(int index) {
+ return _recordList.get(index).data().asByteArray().length;
+ }
+
+ @Override
+ public RowMetadata getMetadataAtIndex(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getNextStreamMessageOffsetAtIndex(int index) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 17691c4..6f660f7 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -48,10 +48,11 @@ public class KinesisConsumerTest {
KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
- List<Record> list = fetchResult.getMessages();
+ KinesisRecordsBatch list = fetchResult.getMessages();
+ int n = list.getMessageCount();
- for (Record record : list) {
- System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+ for (int i=0;i<n;i++) {
+ System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
}
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
index 9d14473..2188ac9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -19,10 +19,11 @@
package org.apache.pinot.spi.stream.v2;
import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
public interface FetchResult<T> {
Checkpoint getLastCheckpoint();
- List<T> getMessages();
+ MessageBatch<T> getMessages();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org