You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/12/07 07:08:39 UTC
[pinot] branch master updated: Add rate limit to Kinesis requests (#9863)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76c649258c Add rate limit to Kinesis requests (#9863)
76c649258c is described below
commit 76c649258c625d431a42ff1fbc1b3003fe013066
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Dec 7 12:38:33 2022 +0530
Add rate limit to Kinesis requests (#9863)
* Add rate limit to Kinesis requests
* Throw warning on zero RPS and rate limit all non-empty requests as well
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 20 ++++++++++++++++++++
.../plugin/stream/kinesis/KinesisConsumer.java | 22 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index d8a3795a2a..47adf78c20 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.UUID;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -30,6 +32,8 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
* Kinesis stream specific config
*/
public class KinesisConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConfig.class);
+
public static final String STREAM_TYPE = "kinesis";
public static final String SHARD_ITERATOR_TYPE = "shardIteratorType";
public static final String REGION = "region";
@@ -37,6 +41,8 @@ public class KinesisConfig {
public static final String SECRET_KEY = "secretKey";
public static final String MAX_RECORDS_TO_FETCH = "maxRecordsToFetch";
public static final String ENDPOINT = "endpoint";
+ public static final String RPS_LIMIT = "requests_per_second_limit";
+
// IAM role configs
/**
@@ -64,6 +70,7 @@ public class KinesisConfig {
public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
+ public static final String DEFAULT_RPS_LIMIT = "5";
private final String _streamTopicName;
private final String _awsRegion;
@@ -80,6 +87,7 @@ public class KinesisConfig {
private String _externalId;
private int _sessionDurationSeconds;
private boolean _asyncSessionUpdateEnabled;
+ private int _rpsLimit;
public KinesisConfig(StreamConfig streamConfig) {
Map<String, String> props = streamConfig.getStreamConfigsMap();
@@ -88,6 +96,14 @@ public class KinesisConfig {
Preconditions.checkNotNull(_awsRegion, "Must provide 'region' in stream config for table: %s",
streamConfig.getTableNameWithType());
_numMaxRecordsToFetch = Integer.parseInt(props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+ _rpsLimit = Integer.parseInt(props.getOrDefault(RPS_LIMIT, DEFAULT_RPS_LIMIT));
+
+ if (_rpsLimit <= 0) {
+ LOGGER.warn("Invalid 'requests_per_second_limit' value: {}."
+ + " Please provide value greater than 0. Using default: {}", _rpsLimit, DEFAULT_RPS_LIMIT);
+ _rpsLimit = Integer.parseInt(DEFAULT_RPS_LIMIT);
+ }
+
_shardIteratorType =
ShardIteratorType.fromValue(props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
_accessKey = props.get(ACCESS_KEY);
@@ -124,6 +140,10 @@ public class KinesisConfig {
return _numMaxRecordsToFetch;
}
+ public int getRpsLimit() {
+ return _rpsLimit;
+ }
+
public ShardIteratorType getShardIteratorType() {
return _shardIteratorType;
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 3fad3a39a6..3ded54ec8f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -52,16 +52,19 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
*/
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class);
+ public static final long SLEEP_TIME_BETWEEN_REQUESTS = 1000L;
private final String _streamTopicName;
private final int _numMaxRecordsToFetch;
private final ExecutorService _executorService;
private final ShardIteratorType _shardIteratorType;
+ private final int _rpsLimit;
public KinesisConsumer(KinesisConfig kinesisConfig) {
super(kinesisConfig);
_streamTopicName = kinesisConfig.getStreamTopicName();
_numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
_shardIteratorType = kinesisConfig.getShardIteratorType();
+ _rpsLimit = kinesisConfig.getRpsLimit();
_executorService = Executors.newSingleThreadExecutor();
}
@@ -72,6 +75,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
_streamTopicName = kinesisConfig.getStreamTopicName();
_numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
_shardIteratorType = kinesisConfig.getShardIteratorType();
+ _rpsLimit = kinesisConfig.getRpsLimit();
_executorService = Executors.newSingleThreadExecutor();
}
@@ -125,9 +129,12 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
String nextStartSequenceNumber;
boolean isEndOfShard = false;
-
+ long currentWindow = System.currentTimeMillis() / SLEEP_TIME_BETWEEN_REQUESTS;
+ int currentWindowRequests = 0;
while (shardIterator != null) {
GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+
+ long requestSentTime = System.currentTimeMillis() / 1000;
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
if (!getRecordsResponse.records().isEmpty()) {
@@ -155,6 +162,19 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
if (Thread.interrupted()) {
break;
}
+
+ // Kinesis enforces a limit of 5 .getRecords request per second on each shard from AWS end
+ // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion
+ if (requestSentTime == currentWindow) {
+ currentWindowRequests++;
+ } else if (requestSentTime > currentWindow) {
+ currentWindow = requestSentTime;
+ currentWindowRequests = 0;
+ }
+
+ if (currentWindowRequests >= _rpsLimit) {
+ Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
+ }
}
return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org