You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:08 UTC
[incubator-pinot] 18/47: Move shardId out of checkpoint to
partition group metadata
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7a4fccc3ad68f72f363f1663f6956c4b2aa6cc78
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:25:13 2020 +0530
Move shardId out of checkpoint to partition group metadata
---
.../plugin/stream/kinesis/KinesisCheckpoint.java | 14 ++------------
.../plugin/stream/kinesis/KinesisConsumer.java | 21 +++++++++------------
.../stream/kinesis/KinesisConsumerFactory.java | 2 +-
.../kinesis/KinesisPartitionGroupMetadataMap.java | 4 +++-
.../plugin/stream/kinesis/KinesisShardMetadata.java | 5 ++---
5 files changed, 17 insertions(+), 29 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 8448665..aa80b17 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -4,11 +4,9 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
public class KinesisCheckpoint implements Checkpoint {
- String _shardId;
String _sequenceNumber;
- public KinesisCheckpoint(String shardId, String sequenceNumber){
- _shardId = shardId;
+ public KinesisCheckpoint(String sequenceNumber){
_sequenceNumber = sequenceNumber;
}
@@ -16,14 +14,6 @@ public class KinesisCheckpoint implements Checkpoint {
return _sequenceNumber;
}
- public String getShardId() {
- return _shardId;
- }
-
- public void setShardId(String shardId) {
- _shardId = shardId;
- }
-
@Override
public byte[] serialize() {
return _sequenceNumber.getBytes();
@@ -32,7 +22,7 @@ public class KinesisCheckpoint implements Checkpoint {
@Override
public Checkpoint deserialize(byte[] blob) {
//TODO: Implement SerDe
- return new KinesisCheckpoint("", new String(blob));
+ return new KinesisCheckpoint(new String(blob));
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7bc1006..d896d67 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -7,6 +7,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
import org.apache.pinot.spi.stream.v2.FetchResult;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -18,18 +19,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
String _stream;
Integer _maxRecords;
+ String _shardId;
- //TODO: Fetch AWS region from Stream Config.
- public KinesisConsumer(String stream, String awsRegion) {
- super(stream, awsRegion);
- _stream = stream;
- _maxRecords = 20;
- }
-
- public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) {
- super(stream, awsRegion);
+ public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
+ super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
_stream = stream;
_maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+ KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
+ _shardId = kinesisShardMetadata.getShardId();
}
@Override
@@ -73,7 +70,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
}
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber);
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
recordList);
@@ -86,11 +83,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
if(kinesisStartCheckpoint.getSequenceNumber() != null) {
String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
getShardIteratorResponse = _kinesisClient.getShardIterator(
- GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.startingSequenceNumber(kinesisStartSequenceNumber).build());
} else{
getShardIteratorResponse = _kinesisClient.getShardIterator(
- GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+ GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
}
return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index bdbc348..0608118 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
@Override
public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
- return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig);
+ return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d15804e..700ec3f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -16,9 +16,11 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
super(stream, awsRegion);
List<Shard> shardList = getShards();
for(Shard shard : shardList){
+ String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
- shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber));
+ shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
+ shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
_stringPartitionGroupMetadataIndex.add(shardMetadata);
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 693b307..e1d23da 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -15,9 +15,8 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
super(streamName, awsRegion);
-
- _startCheckpoint = new KinesisCheckpoint(shardId, null);
- _endCheckpoint = new KinesisCheckpoint(shardId, null);
+ _startCheckpoint = null;
+ _endCheckpoint = null;
_shardId = shardId;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org