You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/10 18:59:00 UTC

[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

    [ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539095#comment-16539095 ] 

ASF GitHub Bot commented on FLINK-9692:
---------------------------------------

GitHub user glaksh100 opened a pull request:

    https://github.com/apache/flink/pull/6300

    [FLINK-9692] Adaptive reads from Kinesis

    ## What is the purpose of the change
    
    The purpose of this change is to provide an option to the Kinesis connector to optimize the amount of data (in bytes) read from Kinesis. The Kinesis connector currently has a [constant value](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213) set for `maxNumberOfRecordsPerFetch` that it can fetch from a single Kinesis `getRecords` call. However, in most realtime scenarios, the average size of the Kinesis record (in bytes) is not constant.
    The idea here is to adapt the Kinesis connector to identify an average batch size prior to making the `getRecords` call, so that the `maxNumberOfRecordsPerFetch` parameter can be tuned to be as high as possible without exceeding  the 2 Mb/sec [per shard limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html).
    
    This feature can be set using a [ConsumerConfigConstants](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java) flag that defaults to false. 
    
    
    ## Brief change log
     - With an initial value for `maxNumberofRecordsPerFetch`, the average size of a record returned in the batch of records is calculated
      - `maxNumberofRecordsPerFetch` is then set to ` 2 Mbps/ (average size of record/fetchIntervalMillis)` to maximize throughput in each `getRecords` call
      - This feature is turned on/off using a boolean  in `ConsumerConfigConstants` - `SHARD_USE_ADAPTIVE_READS`
     - `DEFAULT_SHARD_USE_ADAPTIVE_READS` is set to `false`
    
    ## Verifying this change
    This change added tests and can be verified as follows:
      - Added a `testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads` test method to `ShardConsumerTest`
    
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/glaksh100/flink FLINK-9692.adaptiveKinesisReads

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6300
    
----
commit 0c29017d6d1e98359d3093aaaecc54338324e57e
Author: Lakshmi Gururaja Rao <gl...@...>
Date:   2018-07-10T18:40:02Z

    [FLINK-9692] Adaptive reads from Kinesis

----


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis 
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-9692
>                 URL: https://issues.apache.org/jira/browse/FLINK-9692
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: Lakshmi Rao
>            Priority: Major
>              Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] set for maxRecords that it can fetch from a single Kinesis getRecords call. However, in most realtime scenarios, the average size of the Kinesis record (in bytes) changes depending on the situation i.e. you could be in a transient scenario where you are reading large sized records and would hence like to fetch fewer records in each getRecords call (so as to not exceed the 2 Mb/sec [per shard limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch size prior to making the getRecords call, so that the maxRecords parameter can be appropriately tuned before making the call. 
> This feature can be behind a [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)