You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:10 UTC
[incubator-pinot] 04/23: Add kinesis code to handle offsets
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 76cfcf1816d3d36974841081346edb8c95bf73fc
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 13:57:25 2020 +0530
Add kinesis code to handle offsets
---
.../plugin/stream/kinesis/KinesisCheckpoint.java | 13 ++++---
.../plugin/stream/kinesis/KinesisConsumer.java | 42 +++++++++++++++++++---
.../stream/kinesis/KinesisConsumerFactory.java | 36 +++++++++++++++++++
.../plugin/stream/kinesis/KinesisFetchResult.java | 11 +++---
.../kinesis/KinesisPartitionGroupMetadataMap.java | 31 ++++++++++++++++
.../stream/kinesis/KinesisShardMetadata.java | 5 ++-
6 files changed, 121 insertions(+), 17 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index a330e78..77f790b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -1,23 +1,22 @@
package org.apache.pinot.plugin.stream.kinesis;
import org.apache.pinot.spi.stream.v2.Checkpoint;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
public class KinesisCheckpoint implements Checkpoint {
- String _shardIterator;
+ String _sequenceNumber;
- public KinesisCheckpoint(String shardIterator){
- _shardIterator = shardIterator;
+ public KinesisCheckpoint(String sequenceNumber){
+ _sequenceNumber = sequenceNumber;
}
- public String getShardIterator() {
- return _shardIterator;
+ public String getSequenceNumber() {
+ return _sequenceNumber;
}
@Override
public byte[] serialize() {
- return _shardIterator.getBytes();
+ return _sequenceNumber.getBytes();
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 251d831..dc44079 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -1,19 +1,26 @@
package org.apache.pinot.plugin.stream.kinesis;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
import org.apache.pinot.spi.stream.v2.FetchResult;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+ String _stream;
//TODO: Fetch AWS region from Stream Config.
- public KinesisConsumer(String awsRegion) {
+ public KinesisConsumer(String stream, String awsRegion) {
super(awsRegion);
+ _stream = stream;
}
@Override
@@ -21,18 +28,43 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
- String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+ String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
+ String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
- GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+ GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
+ ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
+
+ String shardIterator = getShardIteratorResponse.shardIterator();
+ GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+ //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception.
if(!getRecordsResponse.hasRecords()){
- return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+ return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList());
+ }
+
+ List<Record> recordList = new ArrayList<>();
+ recordList.addAll(getRecordsResponse.records());
+
+ String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+ while(kinesisNextShardIterator != null){
+ getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
+ getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+ if(getRecordsResponse.hasRecords()){
+ recordList.addAll(getRecordsResponse.records());
+ nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+ }
+
+ if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) {
+ nextStartSequenceNumber = kinesisEndSequenceNumber;
+ break;
+ }
+ kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
}
- KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+ KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber,
getRecordsResponse.records());
return kinesisFetchResult;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
new file mode 100644
index 0000000..6bd1e3a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
+import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+
+
+public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
+ private StreamConfig _streamConfig;
+ private final String AWS_REGION = "aws-region";
+
+ @Override
+ public void init(StreamConfig streamConfig) {
+ _streamConfig = streamConfig;
+ }
+
+ @Override
+ public PartitionGroupMetadataMap getPartitionGroupsMetadata(
+ PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
+ return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+ }
+
+ @Override
+ public SegmentNameGenerator getSegmentNameGenerator() {
+ return null;
+ }
+
+ @Override
+ public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
+ return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 5ef4e30..dc8e764 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -1,16 +1,19 @@
package org.apache.pinot.plugin.stream.kinesis;
+import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.FetchResult;
import software.amazon.awssdk.services.kinesis.model.Record;
-public class KinesisFetchResult implements FetchResult {
- private String _nextShardIterator;
+public class KinesisFetchResult implements FetchResult<Record> {
+ private final String _nextShardIterator;
+ private final List<Record> _recordList;
public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
_nextShardIterator = nextShardIterator;
+ _recordList = recordList;
}
@Override
@@ -19,7 +22,7 @@ public class KinesisFetchResult implements FetchResult {
}
@Override
- public byte[] getMessages() {
- return new byte[0];
+ public List<Record> getMessages() {
+ return _recordList;
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
new file mode 100644
index 0000000..bc3fef2
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -0,0 +1,31 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
+ private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new HashMap<>();
+
+ public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+ super(awsRegion);
+ ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+ List<Shard> shardList = listShardsResponse.shards();
+ for(Shard shard : shardList){
+ String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+ KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
+ shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+ _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+ }
+ }
+
+ public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
+ return _stringPartitionGroupMetadataMap;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 07ede73..d50d821 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
@@ -11,8 +12,10 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
Checkpoint _endCheckpoint;
public KinesisShardMetadata(String shardId, String streamName) {
- GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+ GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
+ ShardIteratorType.LATEST).streamName(streamName).build());
_startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+ _endCheckpoint = null;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org