You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:22 UTC

[incubator-pinot] 32/47: Return message batch instead of list in the fetch result

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c5c42d497320a3e7aedca4a7e1c43808e69222f9
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 31 11:24:42 2020 +0530

    Return message batch instead of list in the fetch result
---
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  7 +--
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 52 ++++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisConsumerTest.java |  7 +--
 .../apache/pinot/spi/stream/v2/FetchResult.java    |  3 +-
 4 files changed, 62 insertions(+), 7 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index aedcd5d..39561f3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
-public class KinesisFetchResult implements FetchResult<Record> {
+public class KinesisFetchResult implements FetchResult<byte[]> {
   private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
@@ -40,7 +41,7 @@ public class KinesisFetchResult implements FetchResult<Record> {
   }
 
   @Override
-  public List<Record> getMessages() {
-    return _recordList;
+  public KinesisRecordsBatch getMessages() {
+    return new KinesisRecordsBatch(_recordList);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
new file mode 100644
index 0000000..ed51f8f
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -0,0 +1,52 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisRecordsBatch implements MessageBatch<byte[]> {
+  private List<Record> _recordList;
+
+  public KinesisRecordsBatch(List<Record> recordList) {
+    _recordList = recordList;
+  }
+
+  @Override
+  public int getMessageCount() {
+    return _recordList.size();
+  }
+
+  @Override
+  public byte[] getMessageAtIndex(int index) {
+    return _recordList.get(index).data().asByteArray();
+  }
+
+  @Override
+  public int getMessageOffsetAtIndex(int index) {
+    //TODO: Doesn't translate to offset. Needs to be replaced.
+    return _recordList.get(index).hashCode();
+  }
+
+  @Override
+  public int getMessageLengthAtIndex(int index) {
+    return _recordList.get(index).data().asByteArray().length;
+  }
+
+  @Override
+  public RowMetadata getMetadataAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 17691c4..6f660f7 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -48,10 +48,11 @@ public class KinesisConsumerTest {
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
       KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
 
-      List<Record> list = fetchResult.getMessages();
+      KinesisRecordsBatch list = fetchResult.getMessages();
+      int n = list.getMessageCount();
 
-      for (Record record : list) {
-        System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+      for (int i=0;i<n;i++) {
+        System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
       }
     }
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
index 9d14473..2188ac9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -19,10 +19,11 @@
 package org.apache.pinot.spi.stream.v2;
 
 import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
 
 
 public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  List<T> getMessages();
+  MessageBatch<T> getMessages();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org