You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steffen Hausmann <st...@hausmann-family.de> on 2017/03/22 18:12:21 UTC

Kinesis connector SHARD_GETRECORDS_MAX default value

Hi there,

I recently ran into problems with a Flink job running on an EMR cluster 
consuming events from a Kinesis stream receiving roughly 15k 
event/second. Although the EMR cluster was substantially scaled and CPU 
utilization and system load were well below any alarming threshold, the 
processing of events of the stream increasingly fell behind.

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100 
which is apparently causing too much overhead when consuming events from 
the stream. Increasing the value to 5000, a single GetRecords call to 
Kinesis can retrieve up to 10k records, made the problem go away.

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low 
(100x less than it could be). The Kinesis Client Library defaults to 
5000 and it's recommended to use this default value: 
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.

Thanks for the clarification!

Cheers,
Steffen

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Steffen,

Thanks for bringing up the discussion!

I think the reason why SHARD_GETRECORDS_INTERVAL_MILLIS was defaulted to 0 in the first place was because we didn’t want false impressions that the there was some latency introduced in Flink with the default settings.
To this end, I’m leaning towards not touching SHARD_GETRECORDS_INTERVAL_MILLIS.
Ideally, the docs in this section [1] should guide the user to tweak this setting if they’re having issues with competing apps also consuming the shards. Could also improve this if you think the notice for this issue needs to be more prominent.

However, I do suggest changing SHARD_GETRECORDS_MAX to a higher value. 100 seems to be too small by default.
Increasing that should also be safe in the sense that it would not introduce unexpected behavior changes for the user.
What do you think?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html#internally-used-kinesis-apis

On 23 June 2017 at 4:30:10 AM, Steffen Hausmann (steffen@hausmann-family.de) wrote:

Hi Gordon,  

Regarding the value for SHARD_GETRECORDS_INTERVAL_MILLIS.  

The best practice would be to set this value to 1000, as this settings  
allows other applications to read from the same Kinesis stream. However,  
this may considerably increase the latency of the respective Flink  
application as events are only consumed every second.  

Alternatively, the default value be as low as 200 (you can currently  
only read from a shard 5 times a second, hence values lower than 200 are  
undesirable), which reduces the latency of a single Flink application,  
but causes undesirable effects when multiple applications consume events  
from the same Kinesis stream.  

I'd prefer setting the default to 1000, but I wanted to get your opinion  
on this before I submit the PR.  

Cheers,  
Steffen  

On 24/04/2017 00:39, Tzu-Li (Gordon) Tai wrote:  
> Thanks for filing the JIRA!  
>  
> Would you also be up to open a PR to for the change? That would be very  
> very helpful :)  
>  
> Cheers,  
> Gordon  
>  
> On 24 April 2017 at 3:27:48 AM, Steffen Hausmann  
> (steffen@hausmann-family.de <ma...@hausmann-family.de>) wrote:  
>  
>> Hi Gordon,  
>>  
>> thanks for looking into this and sorry it took me so long to file the  
>> issue: https://issues.apache.org/jira/browse/FLINK-6365.  
>>  
>> Really appreciate your contributions for the Kinesis connector!  
>>  
>> Cheers,  
>> Steffen  
>>  
>> On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:  
>> > Hi Steffan,  
>> >  
>> > I have to admit that I didn’t put too much thoughts in the default  
>> > values for the Kinesis consumer.  
>> >  
>> > I’d say it would be reasonable to change the default values to follow  
>> > KCL’s settings. Could you file a JIRA for this?  
>> >  
>> > In general, we might want to reconsider all the default values for  
>> > configs related to the getRecords call, i.e.  
>> > - SHARD_GETRECORDS_MAX  
>> > - SHARD_GETRECORDS_INTERVAL_MILLIS  
>> > - SHARD_GETRECORDS_BACKOFF_*  
>> >  
>> > Cheers,  
>> > Gordon  
>> >  
>> > On March 23, 2017 at 2:12:32 AM, Steffen Hausmann  
>> > (steffen@hausmann-family.de <ma...@hausmann-family.de>) wrote:  
>> >  
>> >> Hi there,  
>> >>  
>> >> I recently ran into problems with a Flink job running on an EMR cluster  
>> >> consuming events from a Kinesis stream receiving roughly 15k  
>> >> event/second. Although the EMR cluster was substantially scaled and CPU  
>> >> utilization and system load were well below any alarming threshold, the  
>> >> processing of events of the stream increasingly fell behind.  
>> >>  
>> >> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
>> >> which is apparently causing too much overhead when consuming events from  
>> >> the stream. Increasing the value to 5000, a single GetRecords call to  
>> >> Kinesis can retrieve up to 10k records, made the problem go away.  
>> >>  
>> >> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
>> >> (100x less than it could be). The Kinesis Client Library defaults to  
>> >> 5000 and it's recommended to use this default value:  
>> >> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.  
>> >>  
>> >>  
>> >> Thanks for the clarification!  
>> >>  
>> >> Cheers,  
>> >> Steffen  

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Thanks for filing the JIRA!

Would you also be up to open a PR to for the change? That would be very very helpful :)

Cheers,
Gordon

On 24 April 2017 at 3:27:48 AM, Steffen Hausmann (steffen@hausmann-family.de) wrote:

Hi Gordon,  

thanks for looking into this and sorry it took me so long to file the  
issue: https://issues.apache.org/jira/browse/FLINK-6365.  

Really appreciate your contributions for the Kinesis connector!  

Cheers,  
Steffen  

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:  
> Hi Steffan,  
>  
> I have to admit that I didn’t put too much thoughts in the default  
> values for the Kinesis consumer.  
>  
> I’d say it would be reasonable to change the default values to follow  
> KCL’s settings. Could you file a JIRA for this?  
>  
> In general, we might want to reconsider all the default values for  
> configs related to the getRecords call, i.e.  
> - SHARD_GETRECORDS_MAX  
> - SHARD_GETRECORDS_INTERVAL_MILLIS  
> - SHARD_GETRECORDS_BACKOFF_*  
>  
> Cheers,  
> Gordon  
>  
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann  
> (steffen@hausmann-family.de <ma...@hausmann-family.de>) wrote:  
>  
>> Hi there,  
>>  
>> I recently ran into problems with a Flink job running on an EMR cluster  
>> consuming events from a Kinesis stream receiving roughly 15k  
>> event/second. Although the EMR cluster was substantially scaled and CPU  
>> utilization and system load were well below any alarming threshold, the  
>> processing of events of the stream increasingly fell behind.  
>>  
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
>> which is apparently causing too much overhead when consuming events from  
>> the stream. Increasing the value to 5000, a single GetRecords call to  
>> Kinesis can retrieve up to 10k records, made the problem go away.  
>>  
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
>> (100x less than it could be). The Kinesis Client Library defaults to  
>> 5000 and it's recommended to use this default value:  
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.  
>>  
>>  
>> Thanks for the clarification!  
>>  
>> Cheers,  
>> Steffen  

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Posted by Steffen Hausmann <st...@hausmann-family.de>.
Hi Gordon,

thanks for looking into this and sorry it took me so long to file the 
issue: https://issues.apache.org/jira/browse/FLINK-6365.

Really appreciate your contributions for the Kinesis connector!

Cheers,
Steffen

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:
> Hi Steffan,
>
> I have to admit that I didn\u2019t put too much thoughts in the default
> values for the Kinesis consumer.
>
> I\u2019d say it would be reasonable to change the default values to follow
> KCL\u2019s settings. Could you file a JIRA for this?
>
> In general, we might want to reconsider all the default values for
> configs related to the getRecords call, i.e.
> - SHARD_GETRECORDS_MAX
> - SHARD_GETRECORDS_INTERVAL_MILLIS
> - SHARD_GETRECORDS_BACKOFF_*
>
> Cheers,
> Gordon
>
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann
> (steffen@hausmann-family.de <ma...@hausmann-family.de>) wrote:
>
>> Hi there,
>>
>> I recently ran into problems with a Flink job running on an EMR cluster
>> consuming events from a Kinesis stream receiving roughly 15k
>> event/second. Although the EMR cluster was substantially scaled and CPU
>> utilization and system load were well below any alarming threshold, the
>> processing of events of the stream increasingly fell behind.
>>
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100
>> which is apparently causing too much overhead when consuming events from
>> the stream. Increasing the value to 5000, a single GetRecords call to
>> Kinesis can retrieve up to 10k records, made the problem go away.
>>
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low
>> (100x less than it could be). The Kinesis Client Library defaults to
>> 5000 and it's recommended to use this default value:
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>>
>>
>> Thanks for the clarification!
>>
>> Cheers,
>> Steffen

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Steffan,

I have to admit that I didn’t put too much thoughts in the default values for the Kinesis consumer.

I’d say it would be reasonable to change the default values to follow KCL’s settings. Could you file a JIRA for this?

In general, we might want to reconsider all the default values for configs related to the getRecords call, i.e.
- SHARD_GETRECORDS_MAX
- SHARD_GETRECORDS_INTERVAL_MILLIS
- SHARD_GETRECORDS_BACKOFF_*

Cheers,
Gordon

On March 23, 2017 at 2:12:32 AM, Steffen Hausmann (steffen@hausmann-family.de) wrote:

Hi there,  

I recently ran into problems with a Flink job running on an EMR cluster  
consuming events from a Kinesis stream receiving roughly 15k  
event/second. Although the EMR cluster was substantially scaled and CPU  
utilization and system load were well below any alarming threshold, the  
processing of events of the stream increasingly fell behind.  

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
which is apparently causing too much overhead when consuming events from  
the stream. Increasing the value to 5000, a single GetRecords call to  
Kinesis can retrieve up to 10k records, made the problem go away.  

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
(100x less than it could be). The Kinesis Client Library defaults to  
5000 and it's recommended to use this default value:  
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.  

Thanks for the clarification!  

Cheers,  
Steffen