You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:17 UTC
[incubator-pinot] 11/23: Add Kinesis config wrapper
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b0eeec6926d43a7769e57e83b345e5a52a1c1221
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:35:18 2020 +0530
Add Kinesis config wrapper
---
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 29 ++++++++
.../plugin/stream/kinesis/KinesisConsumer.java | 78 ++++++++++++----------
.../stream/kinesis/KinesisConsumerFactory.java | 10 ++-
3 files changed, 74 insertions(+), 43 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..01d666a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -0,0 +1,29 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public class KinesisConfig {
+ private final StreamConfig _streamConfig;
+ private static final String AWS_REGION = "aws-region";
+ private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+
+ private static final String DEFAULT_AWS_REGION = "us-central-1";
+ private static final String DEFAULT_MAX_RECORDS = "20";
+
+ public KinesisConfig(StreamConfig streamConfig) {
+ _streamConfig = streamConfig;
+ }
+
+ public String getStream(){
+ return _streamConfig.getTopicName();
+ }
+
+ public String getAwsRegion(){
+ return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+ }
+
+ public Integer maxRecordsToFetch(){
+ return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7670f06..96241d4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -30,71 +30,75 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-
+//TODO: Handle exceptions and timeout
public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
String _stream;
Integer _maxRecords;
String _shardId;
- public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
- super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
- _stream = stream;
- _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+ public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
+ super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+ _stream = kinesisConfig.getStream();
+ _maxRecords = kinesisConfig.maxRecordsToFetch();
KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
_shardId = kinesisShardMetadata.getShardId();
}
@Override
public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
- KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+ try {
+ KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
- String shardIterator = getShardIterator(kinesisStartCheckpoint);
+ String shardIterator = getShardIterator(kinesisStartCheckpoint);
- List<Record> recordList = new ArrayList<>();
+ List<Record> recordList = new ArrayList<>();
- String kinesisEndSequenceNumber = null;
+ String kinesisEndSequenceNumber = null;
- if (end != null) {
- KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
- kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
- }
+ if (end != null) {
+ KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+ kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+ }
- String nextStartSequenceNumber = null;
- Long startTimestamp = System.currentTimeMillis();
+ String nextStartSequenceNumber = null;
+ Long startTimestamp = System.currentTimeMillis();
- while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
- GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
- GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+ while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+ GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+ GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
- if (getRecordsResponse.records().size() > 0) {
- recordList.addAll(getRecordsResponse.records());
- nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+ if (getRecordsResponse.records().size() > 0) {
+ recordList.addAll(getRecordsResponse.records());
+ nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
- if (kinesisEndSequenceNumber != null
- && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
- nextStartSequenceNumber = kinesisEndSequenceNumber;
- break;
- }
+ if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
+ nextStartSequenceNumber = kinesisEndSequenceNumber;
+ break;
+ }
- if (recordList.size() >= _maxRecords) {
- break;
+ if (recordList.size() >= _maxRecords) {
+ break;
+ }
}
- }
- shardIterator = getRecordsResponse.nextShardIterator();
- }
+ shardIterator = getRecordsResponse.nextShardIterator();
+ }
- if (nextStartSequenceNumber == null && recordList.size() > 0) {
- nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
- }
+ if (nextStartSequenceNumber == null && recordList.size() > 0) {
+ nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+ }
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
- KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+ KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
- return kinesisFetchResult;
+ return kinesisFetchResult;
+ }catch (KinesisException e){
+ return null;
+ }
}
private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 931fa07..da39aab 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -28,19 +28,17 @@ import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
- private StreamConfig _streamConfig;
- private final String AWS_REGION = "aws-region";
+ private KinesisConfig _kinesisConfig;
@Override
public void init(StreamConfig streamConfig) {
- _streamConfig = streamConfig;
+ _kinesisConfig = new KinesisConfig(streamConfig);
}
@Override
public PartitionGroupMetadataMap getPartitionGroupsMetadata(
PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
- return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
- _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
+ return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion());
}
@Override
@@ -50,6 +48,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
@Override
public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
- return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
+ return new KinesisConsumer(_kinesisConfig, metadata);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org