You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:17 UTC

[incubator-pinot] 27/47: Refactor: get shard iterator methods

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit be19cf6866235d7bf4ce0a74424ae2378f40c8bc
Author: KKcorps <kh...@gmail.com>
AuthorDate: Mon Dec 21 14:25:25 2020 +0530

    Refactor: get shard iterator methods
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fd48a92..3263f87 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -163,21 +163,24 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    GetShardIteratorResponse getShardIteratorResponse;
-
     if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber());
     } else {
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(ShardIteratorType.LATEST).build());
+      return getShardIterator(ShardIteratorType.LATEST, null);
     }
+  }
 
-    return getShardIteratorResponse.shardIterator();
+  public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){
+    if(sequenceNumber == null){
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(shardIteratorType).build()).shardIterator();
+    }else{
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(shardIteratorType)
+              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org