You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Eevee <ev...@gmail.com> on 2020/11/30 02:48:18 UTC

Sticky Partitioner

Hi all,

I've noticed a couple edge cases in the Sticky Partitioner and I'd like 
to discuss introducing a new KIP to fix it.

Behavior
1. Low throughput producers
The first edge case occurs when a broker becomes temporarily unavailable 
for a period less then replica.lag.time.max.ms. If you have a low 
throughput producer generating records without a key and using a small 
value of linger.ms you will quickly hit the 
max.in.flight.requests.per.connection limit for that broker or another 
broker which depends on the unavailable broker to achieve acks=all.
At this point, all records will be redirected to whichever broker hits 
max.in.flight.requests.per.connection first and if the producer has low 
enough throughput compared to batch.size this will result in no records 
being sent to any broker until the failing broker becomes available 
again. Effectively this transforms a short broker failure into a cluster 
failure. Ideally, we'd rather see all records redirected away from these 
brokers rather then too them. 2. Overwhelmed brokers The second edge 
case occurs when an individual broker begins under performing and cannot 
keep up with the producers. Once the broker hits 
max.in.flight.requests.per.connection the producer will begin to 
redirecting all records without keys to the broker. This results in a 
disproportionate percentage of the cluster load going to the failing 
broker and begins a death spiral in which the broker becomes more and 
more overwhelmed resulting in the producers redirecting more and more of 
the clusters load towards it.Proposed Changes We need a solution which 
fixes the interaction between the back pressure mechanism 
max.in.flight.requests.per.connection and the sticky partitioner.

My current thought is we should remove partitions associated with 
brokers which have hit max.in.flight.requests.per.connection from the 
available choices for the sticky partitioners. Once they are below 
max.in.flight.requests.per.connection they'd then be added back into the 
available partition list.

My one concern is that this could cause further edge case behavior for 
producers with small values of linger.ms. In particular I could see a 
scenario in which the producer hits 
max.in.flight.requests.per.connection for all brokers and then blocks on 
send() until a request returns rather then building up a new batch. It's 
possible (I'd need to investigate the send loop further) the producer 
could create a new batch as soon as a request arrives, add a single 
record to it and immediately send it then block on send() again. This 
would result in the producer doing near to no batching and limiting it's 
throughput drastically.

If this is the case, I figure we can allow the sticky partitioner to use 
all partitions if all brokers are at 
max.in.flight.requests.per.connection. In such a case it would add 
records to a single partition until a request completed or it hit 
batch.size and then picked a new partition at random.

Feedback
Before writing a KIP I'd love to hear peoples feedback, alternatives and 
concerns.

Regards,
Evelyn.



Re: Sticky Partitioner

Posted by Justine Olshan <jo...@confluent.io>.
Hi Evelyn,

Thanks for taking a look at improving the sticky partitioner! These edge
cases seem like they would cause quite a bit a trouble.
I think the idea to check for max.in.flight.requests.per.connection is a
good one, but one concern I have is how this information will be available
to the partitioner.

Justine

On Mon, Nov 30, 2020 at 7:10 AM Eevee <ev...@gmail.com> wrote:

> Hi all,
>
> I've noticed a couple edge cases in the Sticky Partitioner and I'd like
> to discuss introducing a new KIP to fix it.
>
> Behavior
> 1. Low throughput producers
> The first edge case occurs when a broker becomes temporarily unavailable
> for a period less then replica.lag.time.max.ms. If you have a low
> throughput producer generating records without a key and using a small
> value of linger.ms you will quickly hit the
> max.in.flight.requests.per.connection limit for that broker or another
> broker which depends on the unavailable broker to achieve acks=all.
> At this point, all records will be redirected to whichever broker hits
> max.in.flight.requests.per.connection first and if the producer has low
> enough throughput compared to batch.size this will result in no records
> being sent to any broker until the failing broker becomes available
> again. Effectively this transforms a short broker failure into a cluster
> failure. Ideally, we'd rather see all records redirected away from these
> brokers rather then too them. 2. Overwhelmed brokers The second edge
> case occurs when an individual broker begins under performing and cannot
> keep up with the producers. Once the broker hits
> max.in.flight.requests.per.connection the producer will begin to
> redirecting all records without keys to the broker. This results in a
> disproportionate percentage of the cluster load going to the failing
> broker and begins a death spiral in which the broker becomes more and
> more overwhelmed resulting in the producers redirecting more and more of
> the clusters load towards it.Proposed Changes We need a solution which
> fixes the interaction between the back pressure mechanism
> max.in.flight.requests.per.connection and the sticky partitioner.
>
> My current thought is we should remove partitions associated with
> brokers which have hit max.in.flight.requests.per.connection from the
> available choices for the sticky partitioners. Once they are below
> max.in.flight.requests.per.connection they'd then be added back into the
> available partition list.
>
> My one concern is that this could cause further edge case behavior for
> producers with small values of linger.ms. In particular I could see a
> scenario in which the producer hits
> max.in.flight.requests.per.connection for all brokers and then blocks on
> send() until a request returns rather then building up a new batch. It's
> possible (I'd need to investigate the send loop further) the producer
> could create a new batch as soon as a request arrives, add a single
> record to it and immediately send it then block on send() again. This
> would result in the producer doing near to no batching and limiting it's
> throughput drastically.
>
> If this is the case, I figure we can allow the sticky partitioner to use
> all partitions if all brokers are at
> max.in.flight.requests.per.connection. In such a case it would add
> records to a single partition until a request completed or it hit
> batch.size and then picked a new partition at random.
>
> Feedback
> Before writing a KIP I'd love to hear peoples feedback, alternatives and
> concerns.
>
> Regards,
> Evelyn.
>
>
>