You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:27 UTC

[incubator-pinot] 21/23: Add isEndOfPartition check in checkpoints

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8d0248a03bbb3c4d42cd390e7c3012a763e9786b
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 24 17:58:40 2020 +0530

    Add isEndOfPartition check in checkpoints
---
 .../pinot/plugin/stream/kinesis/KinesisCheckpoint.java       | 12 +++++++++++-
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  | 10 +++++++++-
 .../main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 027b789..54e26d0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,11 +22,22 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
+  Boolean _isEndOfPartition = false;
 
   public KinesisCheckpoint(String sequenceNumber) {
     _sequenceNumber = sequenceNumber;
   }
 
+  public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
+    _sequenceNumber = sequenceNumber;
+    _isEndOfPartition = isEndOfPartition;
+  }
+
+  @Override
+  public boolean isEndOfPartition() {
+    return _isEndOfPartition;
+  }
+
   public String getSequenceNumber() {
     return _sequenceNumber;
   }
@@ -38,7 +49,6 @@ public class KinesisCheckpoint implements Checkpoint {
 
   @Override
   public KinesisCheckpoint deserialize(byte[] blob) {
-    //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index abbc753..336468a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -95,6 +95,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       }
 
       String nextStartSequenceNumber = null;
+      boolean isEndOfShard = false;
 
       while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
@@ -114,14 +115,21 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
           }
         }
 
+        if(getRecordsResponse.hasChildShards()){
+          //This statement returns true only when end of current shard has reached.
+          isEndOfShard = true;
+          break;
+        }
+
         shardIterator = getRecordsResponse.nextShardIterator();
+
       }
 
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       }
 
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
index 030fe4e..0195684 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream.v2;
 
 public interface Checkpoint {
+  boolean isEndOfPartition();
   byte[] serialize();
   Checkpoint deserialize(byte[] blob);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org