You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/12/07 07:08:39 UTC

[pinot] branch master updated: Add rate limit to Kinesis requests (#9863)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76c649258c Add rate limit to Kinesis requests (#9863)
76c649258c is described below

commit 76c649258c625d431a42ff1fbc1b3003fe013066
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Dec 7 12:38:33 2022 +0530

    Add rate limit to Kinesis requests (#9863)
    
    * Add rate limit to Kinesis requests
    
    * Throw warning on zero RPS and rate limit all non-empty requests as well
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 20 ++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 22 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index d8a3795a2a..47adf78c20 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
@@ -30,6 +32,8 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
  * Kinesis stream specific config
  */
 public class KinesisConfig {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConfig.class);
+
   public static final String STREAM_TYPE = "kinesis";
   public static final String SHARD_ITERATOR_TYPE = "shardIteratorType";
   public static final String REGION = "region";
@@ -37,6 +41,8 @@ public class KinesisConfig {
   public static final String SECRET_KEY = "secretKey";
   public static final String MAX_RECORDS_TO_FETCH = "maxRecordsToFetch";
   public static final String ENDPOINT = "endpoint";
+  public static final String RPS_LIMIT = "requests_per_second_limit";
+
 
   // IAM role configs
   /**
@@ -64,6 +70,7 @@ public class KinesisConfig {
   public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
   public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
   public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
+  public static final String DEFAULT_RPS_LIMIT = "5";
 
   private final String _streamTopicName;
   private final String _awsRegion;
@@ -80,6 +87,7 @@ public class KinesisConfig {
   private String _externalId;
   private int _sessionDurationSeconds;
   private boolean _asyncSessionUpdateEnabled;
+  private int _rpsLimit;
 
   public KinesisConfig(StreamConfig streamConfig) {
     Map<String, String> props = streamConfig.getStreamConfigsMap();
@@ -88,6 +96,14 @@ public class KinesisConfig {
     Preconditions.checkNotNull(_awsRegion, "Must provide 'region' in stream config for table: %s",
         streamConfig.getTableNameWithType());
     _numMaxRecordsToFetch = Integer.parseInt(props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+    _rpsLimit = Integer.parseInt(props.getOrDefault(RPS_LIMIT, DEFAULT_RPS_LIMIT));
+
+    if (_rpsLimit <= 0) {
+      LOGGER.warn("Invalid 'requests_per_second_limit' value: {}."
+          + " Please provide value greater than 0. Using default: {}", _rpsLimit, DEFAULT_RPS_LIMIT);
+      _rpsLimit = Integer.parseInt(DEFAULT_RPS_LIMIT);
+    }
+
     _shardIteratorType =
         ShardIteratorType.fromValue(props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
     _accessKey = props.get(ACCESS_KEY);
@@ -124,6 +140,10 @@ public class KinesisConfig {
     return _numMaxRecordsToFetch;
   }
 
+  public int getRpsLimit() {
+    return _rpsLimit;
+  }
+
   public ShardIteratorType getShardIteratorType() {
     return _shardIteratorType;
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 3fad3a39a6..3ded54ec8f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -52,16 +52,19 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
  */
 public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
   private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class);
+  public static final long SLEEP_TIME_BETWEEN_REQUESTS = 1000L;
   private final String _streamTopicName;
   private final int _numMaxRecordsToFetch;
   private final ExecutorService _executorService;
   private final ShardIteratorType _shardIteratorType;
+  private final int _rpsLimit;
 
   public KinesisConsumer(KinesisConfig kinesisConfig) {
     super(kinesisConfig);
     _streamTopicName = kinesisConfig.getStreamTopicName();
     _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
     _shardIteratorType = kinesisConfig.getShardIteratorType();
+    _rpsLimit = kinesisConfig.getRpsLimit();
     _executorService = Executors.newSingleThreadExecutor();
   }
 
@@ -72,6 +75,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     _streamTopicName = kinesisConfig.getStreamTopicName();
     _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
     _shardIteratorType = kinesisConfig.getShardIteratorType();
+    _rpsLimit = kinesisConfig.getRpsLimit();
     _executorService = Executors.newSingleThreadExecutor();
   }
 
@@ -125,9 +129,12 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
 
       String nextStartSequenceNumber;
       boolean isEndOfShard = false;
-
+      long currentWindow = System.currentTimeMillis() / SLEEP_TIME_BETWEEN_REQUESTS;
+      int currentWindowRequests = 0;
       while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+
+        long requestSentTime = System.currentTimeMillis() / 1000;
         GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
         if (!getRecordsResponse.records().isEmpty()) {
@@ -155,6 +162,19 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
         if (Thread.interrupted()) {
           break;
         }
+
+        // Kinesis enforces a limit of 5 .getRecords request per second on each shard from AWS end
+        // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion
+        if (requestSentTime == currentWindow) {
+          currentWindowRequests++;
+        } else if (requestSentTime > currentWindow) {
+          currentWindow = requestSentTime;
+          currentWindowRequests = 0;
+        }
+
+        if (currentWindowRequests >= _rpsLimit) {
+          Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
+        }
       }
 
       return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org