You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2020/07/21 21:19:36 UTC

MaxConnections understanding on FlinkKinesisProducer via KPL

Hi,
Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
stream(KDS).
Getting following errors:
1.
Throttling
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)

2. ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error]
[shard_map.cc:150] Shard map update for stream "...._write" failed.
Code: *LimitExceededException
Message: Rate exceeded for stream *..._write under account 753274046439.;
retrying in 1500 ms

3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties

https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/

These are the KPL property changes I am planning to make.

*RequestTimeput*: 10000 //default 6000 ms

*AggregationEnabled*: true //default is true

*ThreadPoolSize*: *15* //default 10

*MaxConnections*: *48* //default 24 - this might have been a bottleneck
when we flooded KPL with requests. Requests are sent in parallel over
multiple connections to the backend.

*RecordTtl*: *10000* //default 30000 ms  - drop record after 10s.

*FailIfThrottled*: *true* //default false - so if throttled, don't retry.


We were using parallelism for sinks at 80. So each corresponds to 1
FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
MaxConnections is 24 from KPL.

I am not sure about the MaxConnections setting - what does 48 mean here -is
it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS backend
via KPL ?

Any thoughts on how not to overwhelm KPL while handling real time streaming
load to the Kinesis via the FlinkKinesisProducer ?

TIA,

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
ThreadPoolSize is per KPL instance, so yes that is per subtask.
As I previously mentioned, the maximum concurrent requests going to KDS
would be capped by MaxConnections.

On Thu, Jul 23, 2020 at 6:25 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi Gordon,
> Thx for your reply.
> FlinkKinesisProducer default is ThreadPool which is what I am using. So,
> does that mean only 10 threads are making calls to KDS by default ??
> I see from the number of records coming to the KDS that I need only 1-2
> shards. So, the bottleneck is on the KPL side.
> Does this mean I have to set a QueueLimit of 500 as shown in the example
> below ??
> From what you said, Total MaxConnections would then be by default: 24 *
> number of subtasks = 24 * 80 = 1920 connections to KDS.
> KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ?
> So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ??
>
> I am trying to reconcile the diff above ? Somewhere I am flooding KPL with
> too many requests & it gives the curl 28 error.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
>  expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> I'm not entirely sure of the semantics between ThreadPoolSize and
>> MaxConnections since they are all KPL configurations (this specific
>> question would probably be better directed to AWS),
>> but my guess would be that the number of concurrent requests to the KPL
>> backend is capped by MaxConnections. This is per parallel
>> FlinkKinesisProducer subtask.
>>
>> As for ThreadPoolSize, do note that the default threading model by KPL is
>> PER_REQUEST, for which the KPL native process will launch a thread for each
>> request.
>> Under heavy load, this would of course be an issue. Since you didn't
>> explicitly mention this config, make sure to set this to POOLED to actually
>> make use of a fixed thread pool for requests.
>>
>> Overall, my suggestion is to set a reasonable queue limit for the number
>> of records buffered by KPL's native process (by default it is unbounded).
>> Without that in place, under high load you would easily be resource
>> exhausted, and can cause more unpredictable checkpointing times since the
>> FlinkKinesisProducer would need to flush pending records on checkpoints
>> (which ultimately also applies backpressure upstream).
>>
>> BR,
>> Gordon
>>
>> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
>>> stream(KDS).
>>> Getting following errors:
>>> 1.
>>> Throttling
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>
>>>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>>>
>>> 2. ERROR
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>>  - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error]
>>> [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: *LimitExceededException
>>> Message: Rate exceeded for stream *..._write under account
>>> 753274046439.; retrying in 1500 ms
>>>
>>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>>>
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>>>
>>>
>>> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>>>
>>>
>>> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>>>
>>> These are the KPL property changes I am planning to make.
>>>
>>> *RequestTimeput*: 10000 //default 6000 ms
>>>
>>> *AggregationEnabled*: true //default is true
>>>
>>> *ThreadPoolSize*: *15* //default 10
>>>
>>> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
>>> when we flooded KPL with requests. Requests are sent in parallel over
>>> multiple connections to the backend.
>>>
>>> *RecordTtl*: *10000* //default 30000 ms  - drop record after 10s.
>>>
>>> *FailIfThrottled*: *true* //default false - so if throttled, don't
>>> retry.
>>>
>>>
>>> We were using parallelism for sinks at 80. So each corresponds to 1
>>> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
>>> MaxConnections is 24 from KPL.
>>>
>>> I am not sure about the MaxConnections setting - what does 48 mean here
>>> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
>>> backend via KPL ?
>>>
>>> Any thoughts on how not to overwhelm KPL while handling real time
>>> streaming load to the Kinesis via the FlinkKinesisProducer ?
>>>
>>> TIA,
>>>
>>

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Gordon,
Thx for your reply.
FlinkKinesisProducer default is ThreadPool which is what I am using. So,
does that mean only 10 threads are making calls to KDS by default ??
I see from the number of records coming to the KDS that I need only 1-2
shards. So, the bottleneck is on the KPL side.
Does this mean I have to set a QueueLimit of 500 as shown in the example
below ??
From what you said, Total MaxConnections would then be by default: 24 *
number of subtasks = 24 * 80 = 1920 connections to KDS.
KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ?
So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ??

I am trying to reconcile the diff above ? Somewhere I am flooding KPL with
too many requests & it gives the curl 28 error.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
 expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Vijay,
>
> I'm not entirely sure of the semantics between ThreadPoolSize and
> MaxConnections since they are all KPL configurations (this specific
> question would probably be better directed to AWS),
> but my guess would be that the number of concurrent requests to the KPL
> backend is capped by MaxConnections. This is per parallel
> FlinkKinesisProducer subtask.
>
> As for ThreadPoolSize, do note that the default threading model by KPL is
> PER_REQUEST, for which the KPL native process will launch a thread for each
> request.
> Under heavy load, this would of course be an issue. Since you didn't
> explicitly mention this config, make sure to set this to POOLED to actually
> make use of a fixed thread pool for requests.
>
> Overall, my suggestion is to set a reasonable queue limit for the number
> of records buffered by KPL's native process (by default it is unbounded).
> Without that in place, under high load you would easily be resource
> exhausted, and can cause more unpredictable checkpointing times since the
> FlinkKinesisProducer would need to flush pending records on checkpoints
> (which ultimately also applies backpressure upstream).
>
> BR,
> Gordon
>
> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi,
>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
>> stream(KDS).
>> Getting following errors:
>> 1.
>> Throttling
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>
>>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>>
>> 2. ERROR
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error]
>> [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: *LimitExceededException
>> Message: Rate exceeded for stream *..._write under account
>> 753274046439.; retrying in 1500 ms
>>
>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>>
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>>
>>
>> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>>
>>
>> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>>
>> These are the KPL property changes I am planning to make.
>>
>> *RequestTimeput*: 10000 //default 6000 ms
>>
>> *AggregationEnabled*: true //default is true
>>
>> *ThreadPoolSize*: *15* //default 10
>>
>> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
>> when we flooded KPL with requests. Requests are sent in parallel over
>> multiple connections to the backend.
>>
>> *RecordTtl*: *10000* //default 30000 ms  - drop record after 10s.
>>
>> *FailIfThrottled*: *true* //default false - so if throttled, don't retry.
>>
>>
>> We were using parallelism for sinks at 80. So each corresponds to 1
>> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
>> MaxConnections is 24 from KPL.
>>
>> I am not sure about the MaxConnections setting - what does 48 mean here
>> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
>> backend via KPL ?
>>
>> Any thoughts on how not to overwhelm KPL while handling real time
>> streaming load to the Kinesis via the FlinkKinesisProducer ?
>>
>> TIA,
>>
>

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Vijay,

I'm not entirely sure of the semantics between ThreadPoolSize and
MaxConnections since they are all KPL configurations (this specific
question would probably be better directed to AWS),
but my guess would be that the number of concurrent requests to the KPL
backend is capped by MaxConnections. This is per parallel
FlinkKinesisProducer subtask.

As for ThreadPoolSize, do note that the default threading model by KPL is
PER_REQUEST, for which the KPL native process will launch a thread for each
request.
Under heavy load, this would of course be an issue. Since you didn't
explicitly mention this config, make sure to set this to POOLED to actually
make use of a fixed thread pool for requests.

Overall, my suggestion is to set a reasonable queue limit for the number of
records buffered by KPL's native process (by default it is unbounded).
Without that in place, under high load you would easily be resource
exhausted, and can cause more unpredictable checkpointing times since the
FlinkKinesisProducer would need to flush pending records on checkpoints
(which ultimately also applies backpressure upstream).

BR,
Gordon

On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
> stream(KDS).
> Getting following errors:
> 1.
> Throttling
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>
> 2. ERROR
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error]
> [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: *LimitExceededException
> Message: Rate exceeded for stream *..._write under account 753274046439.;
> retrying in 1500 ms
>
> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>
>
> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>
>
> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>
> These are the KPL property changes I am planning to make.
>
> *RequestTimeput*: 10000 //default 6000 ms
>
> *AggregationEnabled*: true //default is true
>
> *ThreadPoolSize*: *15* //default 10
>
> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
> when we flooded KPL with requests. Requests are sent in parallel over
> multiple connections to the backend.
>
> *RecordTtl*: *10000* //default 30000 ms  - drop record after 10s.
>
> *FailIfThrottled*: *true* //default false - so if throttled, don't retry.
>
>
> We were using parallelism for sinks at 80. So each corresponds to 1
> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
> MaxConnections is 24 from KPL.
>
> I am not sure about the MaxConnections setting - what does 48 mean here
> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
> backend via KPL ?
>
> Any thoughts on how not to overwhelm KPL while handling real time
> streaming load to the Kinesis via the FlinkKinesisProducer ?
>
> TIA,
>