You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2020/07/07 16:22:45 UTC

FlinkKinesisProducer blocking ?

Hi,
current setup.

Kinesis stream 1 -----> Kinesis Analytics Flink -----> Kinesis stream 2
|
----> Firehose Delivery stream

Curl eror:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
 - [2020-07-02 15:22:32.203053] [0x000007f4][0x00007ffbced15700] [error]
[AWS Log: ERROR](CurlHttpClient)Curl returned error code 28

But I am still seeing tons of the curl 28 error. I use parallelism of 80
for the Sink to Kinesis Data stream(KDS). Which seems to point to KDS being
pounded with too many requests - the 80(parallelism) * 10(ThreadPool size)
= 800 requests. Is my understanding correct ? So, maybe reduce the 80
parallelism ??
*I still don't understand why the logs are stuck with just
FlinkKInesisProducer for around 4s(blocking calls???) *with the rest of the
Flink Analytics application not producing any logs while this happens.
*I noticed that the FlinkKInesisProducer took about 3.785secs, 3.984s,
4.223s in between other application logs in Kibana when the Kinesis
GetIterator Age peaked*. It seemed like FlinkKinesisProducer was blocking
for that long when the Flink app was not able to generate any other logs.

Looked at this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure

Could use this:
producerConfig.put("RequestTimeout", "10000");//from 6000

But doesn't really solve the problem when trying to maintain a real time
processing system.

TIA

Re: FlinkKinesisProducer blocking ?

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Thanks Gordon,
So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a
Queue(unbounded by default). This then goes through KPL MaxConnections(24
by default) to KDS.

This suggests,  I need to decrease sub-tasks or setQueueLimit(800) and
increase MaxConnections=256 (max allowed).
Checkpointing is not currently enabled.

Pls correct me if I am wrong.

On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Vijay,
>
> ThreadPoolSize is for per Kinesis producer, which there is one for each
> parallel subtask.
> If you are constantly hitting the 1MB per second per shard quota, then the
> records will be buffered by the FlinkKinesisProducer.
> During this process, backpressure is not applied if you have not
> configured an upper bound for the buffer queue.
>
> One other thing to note, which might explain the backpresses at regular
> intervals that you are experiencing,
> is that the FlinkKinesisProducer needs to flush all pending records in the
> buffer before the checkpoint can complete for the sink.
> That would also apply backpressure upstream.
>
> Gordon
>
> On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi Gordon,
>> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
>> 32 nodes.
>> Could it be that the 80 threads get bottlenecked on a common ThreadPool
>> of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers
>> run in separate slots/vCPUs and can be spread across 32 nodes in my case
>> but occupying 80 slots/vCPUs. Is my understanding correct and will this be
>> the reason that the KPL gets flooded with too many pending requests at
>> regular intervals ??
>>
>> TIA,
>>
>> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Thanks,Gordon for your reply.
>>>
>>> I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
>>> So, it should just be dropping records being produced from the
>>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>>> do not want backpressure as you said it effectively blocks all upstream
>>> operators.
>>>
>>> But from what you are saying, it will apply backpressure when the number
>>> of outstanding records accumulated exceeds the default queue limit of 2147483647
>>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>>> probable.
>>>
>>> So, calculating Queue Limit:
>>> Based on this, my records size = 1600 bytes. I have 96 shards
>>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>>
>>> Acc. to the docs:
>>>
>>> By default, FlinkKinesisProducer does not backpressure. Instead,
>>> records that cannot be sent because of the rate restriction of 1 MB per
>>> second per shard are buffered in an unbounded queue and dropped when their
>>> RecordTtl expires.
>>>
>>> To avoid data loss, you can enable backpressuring by restricting the
>>> size of the internal queue:
>>>
>>> // 200 Bytes per record, 1 shard
>>> kinesis.setQueueLimit(500);
>>>
>>>
>>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
>>> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>>> It does however apply backpressure (therefore effectively blocking all
>>>> upstream operators) when the number of outstanding records accumulated
>>>> exceeds a set limit, configured using the
>>>> FlinkKinesisProducer#setQueueLimit
>>>> method.
>>>>
>>>> For starters, you can maybe check if that was set appropriately.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: FlinkKinesisProducer blocking ?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each
parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the
records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured
an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular
intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the
buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi Gordon,
> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
> 32 nodes.
> Could it be that the 80 threads get bottlenecked on a common ThreadPool of
> 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
> in separate slots/vCPUs and can be spread across 32 nodes in my case but
> occupying 80 slots/vCPUs. Is my understanding correct and will this be the
> reason that the KPL gets flooded with too many pending requests at regular
> intervals ??
>
> TIA,
>
> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Thanks,Gordon for your reply.
>>
>> I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
>> So, it should just be dropping records being produced from the
>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>> do not want backpressure as you said it effectively blocks all upstream
>> operators.
>>
>> But from what you are saying, it will apply backpressure when the number
>> of outstanding records accumulated exceeds the default queue limit of 2147483647
>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>> probable.
>>
>> So, calculating Queue Limit:
>> Based on this, my records size = 1600 bytes. I have 96 shards
>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>
>> Acc. to the docs:
>>
>> By default, FlinkKinesisProducer does not backpressure. Instead, records
>> that cannot be sent because of the rate restriction of 1 MB per second per
>> shard are buffered in an unbounded queue and dropped when their RecordTtl
>> expires.
>>
>> To avoid data loss, you can enable backpressuring by restricting the size
>> of the internal queue:
>>
>> // 200 Bytes per record, 1 shard
>> kinesis.setQueueLimit(500);
>>
>>
>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>> It does however apply backpressure (therefore effectively blocking all
>>> upstream operators) when the number of outstanding records accumulated
>>> exceeds a set limit, configured using the
>>> FlinkKinesisProducer#setQueueLimit
>>> method.
>>>
>>> For starters, you can maybe check if that was set appropriately.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: FlinkKinesisProducer blocking ?

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32
nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of
10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
in separate slots/vCPUs and can be spread across 32 nodes in my case but
occupying 80 slots/vCPUs. Is my understanding correct and will this be the
reason that the KPL gets flooded with too many pending requests at regular
intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Thanks,Gordon for your reply.
>
> I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
> So, it should just be dropping records being produced from the
> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
> do not want backpressure as you said it effectively blocks all upstream
> operators.
>
> But from what you are saying, it will apply backpressure when the number
> of outstanding records accumulated exceeds the default queue limit of 2147483647
> or* does it also do it if it is r**ate-limited* *to 1MB per second per
> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
> probable.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
> expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>> It does however apply backpressure (therefore effectively blocking all
>> upstream operators) when the number of outstanding records accumulated
>> exceeds a set limit, configured using the
>> FlinkKinesisProducer#setQueueLimit
>> method.
>>
>> For starters, you can maybe check if that was set appropriately.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: FlinkKinesisProducer blocking ?

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
So, it should just be dropping records being produced from the
80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
do not want backpressure as you said it effectively blocks all upstream
operators.

But from what you are saying, it will apply backpressure when the number of
outstanding records accumulated exceeds the default queue limit of 2147483647
or* does it also do it if it is r**ate-limited* *to 1MB per second per
shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Vijay,
>
> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
> It does however apply backpressure (therefore effectively blocking all
> upstream operators) when the number of outstanding records accumulated
> exceeds a set limit, configured using the
> FlinkKinesisProducer#setQueueLimit
> method.
>
> For starters, you can maybe check if that was set appropriately.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: FlinkKinesisProducer blocking ?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/