You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:27 UTC
[incubator-pinot] 21/23: Add isEndOfPartition check in checkpoints
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8d0248a03bbb3c4d42cd390e7c3012a763e9786b
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 24 17:58:40 2020 +0530
Add isEndOfPartition check in checkpoints
---
.../pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 12 +++++++++++-
.../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 10 +++++++++-
.../main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java | 1 +
3 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 027b789..54e26d0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,11 +22,22 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
public class KinesisCheckpoint implements Checkpoint {
String _sequenceNumber;
+ Boolean _isEndOfPartition = false;
public KinesisCheckpoint(String sequenceNumber) {
_sequenceNumber = sequenceNumber;
}
+ public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
+ _sequenceNumber = sequenceNumber;
+ _isEndOfPartition = isEndOfPartition;
+ }
+
+ @Override
+ public boolean isEndOfPartition() {
+ return _isEndOfPartition;
+ }
+
public String getSequenceNumber() {
return _sequenceNumber;
}
@@ -38,7 +49,6 @@ public class KinesisCheckpoint implements Checkpoint {
@Override
public KinesisCheckpoint deserialize(byte[] blob) {
- //TODO: Implement SerDe
return new KinesisCheckpoint(new String(blob));
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index abbc753..336468a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -95,6 +95,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
String nextStartSequenceNumber = null;
+ boolean isEndOfShard = false;
while (shardIterator != null) {
GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
@@ -114,14 +115,21 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
}
+ if(getRecordsResponse.hasChildShards()){
+ //This statement returns true only when end of current shard has reached.
+ isEndOfShard = true;
+ break;
+ }
+
shardIterator = getRecordsResponse.nextShardIterator();
+
}
if (nextStartSequenceNumber == null && recordList.size() > 0) {
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
}
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
return kinesisFetchResult;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
index 030fe4e..0195684 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream.v2;
public interface Checkpoint {
+ boolean isEndOfPartition();
byte[] serialize();
Checkpoint deserialize(byte[] blob);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org