You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Bistline <sr...@gmail.com> on 2018/11/09 14:34:32 UTC

FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

I am getting this error from the Flink Kinesis Connector. I have a native
KCL app running in parallel with no problems.


Any help would be appreciated


Thanks so much!!


Steve


flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:11,579
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 258 millis (Rate
exceeded for shard shardId-000000000000 in stream CSV under account
xxxxxxxxx  . (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
e1c0caa4-8c4c-7738-b59f-4977bc762cf3))

flink-sbistl919-taskexecutor-0-CACSVML-15736.log:2018-11-09 07:46:16,844
WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy  -
Got recoverable SdkClientException. Backing off for 203 millis (Rate
exceeded for shard shardId-000000000001 in stream CSV under account
xxxxxxxxx. (Service: AmazonKinesis; Status Code: 400; Error Code:
ProvisionedThroughputExceededException; Request ID:
f7d22c26-96f6-c547-a38d-affe493cd2e1))

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Gordon,

Thanks for the reply.

So is it true to say that the KPL RateLimit would not get enforced when the
sink parallelism is >1? If multiple subtasks are writing to the same shard
and each has their own RateLimit, it is possible that the RateLimit is
crossed.

If that's the case, can you suggest a way to overcome this?

Thanks,
Rafi


On Tue, Nov 13, 2018 at 6:27 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi all,
>
> I think Steve's occurrence of the warning was from the consumer side.
>
> For the Flink Kinesis Consumer, this could most likely occur due to
> excessive ListShard API calls on the target Kinesis stream. The consumer
> uses this API to discover shards, at a fixed interval.
> The problem with the current design is that all subtasks of the consumer
> would try to discover shards, and therefore during the discovery, it may be
> possible that AWS's service rate limit is hit.
> The community is well aware of this shortcoming, and AFAIK, we have some
> plans to address this for Flink 1.8 / 1.9.
>
> @Rafi, as for the producer side, you may want to take a look providing a
> FlinkKinesisPartitioner. By default, this is a round-robin partitioning of
> the records, i.e. records received by a subtask of the Kinesis sink can end
> up in any of the Kinesis shards.
>
> Cheers,
> Gordon
>
> On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <ra...@gmail.com> wrote:
>
>> Hi Steve,
>>
>> We've encountered this also. We have way more than enough shards, but
>> were still getting exceptions.
>> We think we know what is the reason, we would love for someone to
>> approve/reject.
>>
>> What we suspect is happening is as follows:
>>
>> The KPL's RateLimit parameter is tracking the amount of bytes/records
>> written into a specific shard.
>> If the parallelism of your Sink is >1 (which is probably the case),
>> multiple tasks == multiple KPL instances which may be writing to the same
>> shard.
>> So for each individual KPL the RateLimit is not breached, but if multiple
>> parallel tasks are writing to the same shard the RateLimit gets breached
>> and a ProvisionedThroughputExceededException is being thrown.
>>
>> What we've tried:
>>
>>    - Using a random partition key to spread the load evenly between the
>>    shards. This did not work for us...
>>    - We tried to make records being written to the same shards by the
>>    same KPL instance, so the RateLimit would get enforced. We did a keyBy
>>    before the Sink to ensure same records go to the same task and using the
>>    same keyBy logic as the Kinesis partitionKey. This did not work for us...
>>
>> What solved it eventually:
>>
>> Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a
>> queueSize so that we'll get back-pressured in case of high load (without
>> getting ProvisionedThroughputExceededException exceptions). This
>> solved the problem and currently is not a bottleneck for us, but can be
>> soon. So this is not a real solution.
>>
>> Can anyone suggest a better solution? Approve/reject our assumption?
>>
>> Thanks
>> Rafi
>>
>>
>> On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrinsky@gmail.com wrote:
>>
>>> If it's running in parallel aren't you just adding readers which maxes
>>> out
>>> your provisioned throughput? probably doesn't belong in here but rather a
>>> Kinesis thing, but i suggest increasing your number of shards?
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi all,

I think Steve's occurrence of the warning was from the consumer side.

For the Flink Kinesis Consumer, this could most likely occur due to
excessive ListShard API calls on the target Kinesis stream. The consumer
uses this API to discover shards, at a fixed interval.
The problem with the current design is that all subtasks of the consumer
would try to discover shards, and therefore during the discovery, it may be
possible that AWS's service rate limit is hit.
The community is well aware of this shortcoming, and AFAIK, we have some
plans to address this for Flink 1.8 / 1.9.

@Rafi, as for the producer side, you may want to take a look providing a
FlinkKinesisPartitioner. By default, this is a round-robin partitioning of
the records, i.e. records received by a subtask of the Kinesis sink can end
up in any of the Kinesis shards.

Cheers,
Gordon

On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <ra...@gmail.com> wrote:

> Hi Steve,
>
> We've encountered this also. We have way more than enough shards, but were
> still getting exceptions.
> We think we know what is the reason, we would love for someone to
> approve/reject.
>
> What we suspect is happening is as follows:
>
> The KPL's RateLimit parameter is tracking the amount of bytes/records
> written into a specific shard.
> If the parallelism of your Sink is >1 (which is probably the case),
> multiple tasks == multiple KPL instances which may be writing to the same
> shard.
> So for each individual KPL the RateLimit is not breached, but if multiple
> parallel tasks are writing to the same shard the RateLimit gets breached
> and a ProvisionedThroughputExceededException is being thrown.
>
> What we've tried:
>
>    - Using a random partition key to spread the load evenly between the
>    shards. This did not work for us...
>    - We tried to make records being written to the same shards by the
>    same KPL instance, so the RateLimit would get enforced. We did a keyBy
>    before the Sink to ensure same records go to the same task and using the
>    same keyBy logic as the Kinesis partitionKey. This did not work for us...
>
> What solved it eventually:
>
> Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a
> queueSize so that we'll get back-pressured in case of high load (without
> getting ProvisionedThroughputExceededException exceptions). This
> solved the problem and currently is not a bottleneck for us, but can be
> soon. So this is not a real solution.
>
> Can anyone suggest a better solution? Approve/reject our assumption?
>
> Thanks
> Rafi
>
>
> On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrinsky@gmail.com wrote:
>
>> If it's running in parallel aren't you just adding readers which maxes out
>> your provisioned throughput? probably doesn't belong in here but rather a
>> Kinesis thing, but i suggest increasing your number of shards?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Steve,

We've encountered this also. We have way more than enough shards, but were
still getting exceptions.
We think we know what is the reason, we would love for someone to
approve/reject.

What we suspect is happening is as follows:

The KPL's RateLimit parameter is tracking the amount of bytes/records
written into a specific shard.
If the parallelism of your Sink is >1 (which is probably the case),
multiple tasks == multiple KPL instances which may be writing to the same
shard.
So for each individual KPL the RateLimit is not breached, but if multiple
parallel tasks are writing to the same shard the RateLimit gets breached
and a ProvisionedThroughputExceededException is being thrown.

What we've tried:

   - Using a random partition key to spread the load evenly between the
   shards. This did not work for us...
   - We tried to make records being written to the same shards by the same
   KPL instance, so the RateLimit would get enforced. We did a keyBy before
   the Sink to ensure same records go to the same task and using the same
   keyBy logic as the Kinesis partitionKey. This did not work for us...

What solved it eventually:

Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a
queueSize so that we'll get back-pressured in case of high load (without
getting ProvisionedThroughputExceededException exceptions). This solved the
problem and currently is not a bottleneck for us, but can be soon. So this
is not a real solution.

Can anyone suggest a better solution? Approve/reject our assumption?

Thanks
Rafi


On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrinsky@gmail.com wrote:

> If it's running in parallel aren't you just adding readers which maxes out
> your provisioned throughput? probably doesn't belong in here but rather a
> Kinesis thing, but i suggest increasing your number of shards?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: FLINK Kinesis Connector Error - ProvisionedThroughputExceededException

Posted by shkob1 <sh...@gmail.com>.
If it's running in parallel aren't you just adding readers which maxes out
your provisioned throughput? probably doesn't belong in here but rather a
Kinesis thing, but i suggest increasing your number of shards?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/