You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:08 UTC

[incubator-pinot] 18/47: Move shardId out of checkpoint to partition group metadata

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7a4fccc3ad68f72f363f1663f6956c4b2aa6cc78
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:25:13 2020 +0530

    Move shardId out of checkpoint to partition group metadata
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java    | 14 ++------------
 .../plugin/stream/kinesis/KinesisConsumer.java      | 21 +++++++++------------
 .../stream/kinesis/KinesisConsumerFactory.java      |  2 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java   |  4 +++-
 .../plugin/stream/kinesis/KinesisShardMetadata.java |  5 ++---
 5 files changed, 17 insertions(+), 29 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 8448665..aa80b17 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -4,11 +4,9 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _shardId;
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String shardId, String sequenceNumber){
-    _shardId = shardId;
+  public KinesisCheckpoint(String sequenceNumber){
     _sequenceNumber = sequenceNumber;
   }
 
@@ -16,14 +14,6 @@ public class KinesisCheckpoint implements Checkpoint {
     return _sequenceNumber;
   }
 
-  public String getShardId() {
-    return _shardId;
-  }
-
-  public void setShardId(String shardId) {
-    _shardId = shardId;
-  }
-
   @Override
   public byte[] serialize() {
     return _sequenceNumber.getBytes();
@@ -32,7 +22,7 @@ public class KinesisCheckpoint implements Checkpoint {
   @Override
   public Checkpoint deserialize(byte[] blob) {
     //TODO: Implement SerDe
-    return new KinesisCheckpoint("", new String(blob));
+    return new KinesisCheckpoint(new String(blob));
   }
 
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7bc1006..d896d67 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -7,6 +7,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -18,18 +19,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
   Integer _maxRecords;
+  String _shardId;
 
-  //TODO: Fetch AWS region from  Stream Config.
-  public KinesisConsumer(String stream, String awsRegion) {
-    super(stream, awsRegion);
-    _stream = stream;
-    _maxRecords = 20;
-  }
-
-  public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) {
-    super(stream, awsRegion);
+  public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
+    super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
     _stream = stream;
     _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+    KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
+    _shardId = kinesisShardMetadata.getShardId();
   }
 
   @Override
@@ -73,7 +70,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
     }
 
-    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber);
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
     KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
         recordList);
 
@@ -86,11 +83,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     if(kinesisStartCheckpoint.getSequenceNumber() != null) {
       String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
               .startingSequenceNumber(kinesisStartSequenceNumber).build());
     } else{
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
     }
 
     return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index bdbc348..0608118 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig);
+    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d15804e..700ec3f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -16,9 +16,11 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
     super(stream, awsRegion);
     List<Shard> shardList = getShards();
     for(Shard shard : shardList){
+      String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber));
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
+      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
       _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 693b307..e1d23da 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -15,9 +15,8 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
 
   public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
     super(streamName, awsRegion);
-
-    _startCheckpoint = new KinesisCheckpoint(shardId, null);
-    _endCheckpoint = new KinesisCheckpoint(shardId, null);
+    _startCheckpoint = null;
+    _endCheckpoint = null;
     _shardId = shardId;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org