You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/12 01:31:45 UTC

[GitHub] glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679
 
 
   ## What is the purpose of the change
   This pull request adds a ratelimiting feature to the Flink Kafka consumer. There are instances when a Flink job that reads from Kafka can read at a significantly high throughput (particularly while processing a backlog) and degrade the underlying Kafka cluster. While Kafka quotas are perhaps the best way to enforce this ratelimiting, there are cases where such a setup is not available or easily enabled. In such a scenario, ratelimiting on the FlinkKafkaConsumer is a useful feature. 
   
   ## Brief change log
     - This feature is set by using a feature flag - `kafka.consumer.ratelimiting.enabled`
     -  A`RateLimiterFactory` is used to configure and create a Guava [RateLimiter](https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) with a desired rate.
     -  The `consumer.poll()` part of the `run()` loop in the `KafkaConsumerThread` is modularized into a `getRecordsFromKafka()` method.
     - The rate is controlled by setting the bytes received from Kafka as the parameter to the `acquire()` call. 
    
   ## Verifying this change
   This change added tests and can be verified as follows:
    - Added a `testRateLimiting()` test in the `KafkaConsumerThreadTest` class.
    - Manually verified the change using a test application and screenshots of results are added [here](https://issues.apache.org/jira/browse/FLINK-11501?focusedCommentId=16762965&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16762965). 
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services