You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Eevee <ev...@gmail.com> on 2020/11/30 02:48:18 UTC
Sticky Partitioner
Hi all,
I've noticed a couple edge cases in the Sticky Partitioner and I'd like
to discuss introducing a new KIP to fix it.
Behavior
1. Low throughput producers
The first edge case occurs when a broker becomes temporarily unavailable
for a period less then replica.lag.time.max.ms. If you have a low
throughput producer generating records without a key and using a small
value of linger.ms you will quickly hit the
max.in.flight.requests.per.connection limit for that broker or another
broker which depends on the unavailable broker to achieve acks=all.
At this point, all records will be redirected to whichever broker hits
max.in.flight.requests.per.connection first and if the producer has low
enough throughput compared to batch.size this will result in no records
being sent to any broker until the failing broker becomes available
again. Effectively this transforms a short broker failure into a cluster
failure. Ideally, we'd rather see all records redirected away from these
brokers rather then too them. 2. Overwhelmed brokers The second edge
case occurs when an individual broker begins under performing and cannot
keep up with the producers. Once the broker hits
max.in.flight.requests.per.connection the producer will begin to
redirecting all records without keys to the broker. This results in a
disproportionate percentage of the cluster load going to the failing
broker and begins a death spiral in which the broker becomes more and
more overwhelmed resulting in the producers redirecting more and more of
the clusters load towards it.Proposed Changes We need a solution which
fixes the interaction between the back pressure mechanism
max.in.flight.requests.per.connection and the sticky partitioner.
My current thought is we should remove partitions associated with
brokers which have hit max.in.flight.requests.per.connection from the
available choices for the sticky partitioners. Once they are below
max.in.flight.requests.per.connection they'd then be added back into the
available partition list.
My one concern is that this could cause further edge case behavior for
producers with small values of linger.ms. In particular I could see a
scenario in which the producer hits
max.in.flight.requests.per.connection for all brokers and then blocks on
send() until a request returns rather then building up a new batch. It's
possible (I'd need to investigate the send loop further) the producer
could create a new batch as soon as a request arrives, add a single
record to it and immediately send it then block on send() again. This
would result in the producer doing near to no batching and limiting it's
throughput drastically.
If this is the case, I figure we can allow the sticky partitioner to use
all partitions if all brokers are at
max.in.flight.requests.per.connection. In such a case it would add
records to a single partition until a request completed or it hit
batch.size and then picked a new partition at random.
Feedback
Before writing a KIP I'd love to hear peoples feedback, alternatives and
concerns.
Regards,
Evelyn.
Re: Sticky Partitioner
Posted by Justine Olshan <jo...@confluent.io>.
Hi Evelyn,
Thanks for taking a look at improving the sticky partitioner! These edge
cases seem like they would cause quite a bit a trouble.
I think the idea to check for max.in.flight.requests.per.connection is a
good one, but one concern I have is how this information will be available
to the partitioner.
Justine
On Mon, Nov 30, 2020 at 7:10 AM Eevee <ev...@gmail.com> wrote:
> Hi all,
>
> I've noticed a couple edge cases in the Sticky Partitioner and I'd like
> to discuss introducing a new KIP to fix it.
>
> Behavior
> 1. Low throughput producers
> The first edge case occurs when a broker becomes temporarily unavailable
> for a period less then replica.lag.time.max.ms. If you have a low
> throughput producer generating records without a key and using a small
> value of linger.ms you will quickly hit the
> max.in.flight.requests.per.connection limit for that broker or another
> broker which depends on the unavailable broker to achieve acks=all.
> At this point, all records will be redirected to whichever broker hits
> max.in.flight.requests.per.connection first and if the producer has low
> enough throughput compared to batch.size this will result in no records
> being sent to any broker until the failing broker becomes available
> again. Effectively this transforms a short broker failure into a cluster
> failure. Ideally, we'd rather see all records redirected away from these
> brokers rather then too them. 2. Overwhelmed brokers The second edge
> case occurs when an individual broker begins under performing and cannot
> keep up with the producers. Once the broker hits
> max.in.flight.requests.per.connection the producer will begin to
> redirecting all records without keys to the broker. This results in a
> disproportionate percentage of the cluster load going to the failing
> broker and begins a death spiral in which the broker becomes more and
> more overwhelmed resulting in the producers redirecting more and more of
> the clusters load towards it.Proposed Changes We need a solution which
> fixes the interaction between the back pressure mechanism
> max.in.flight.requests.per.connection and the sticky partitioner.
>
> My current thought is we should remove partitions associated with
> brokers which have hit max.in.flight.requests.per.connection from the
> available choices for the sticky partitioners. Once they are below
> max.in.flight.requests.per.connection they'd then be added back into the
> available partition list.
>
> My one concern is that this could cause further edge case behavior for
> producers with small values of linger.ms. In particular I could see a
> scenario in which the producer hits
> max.in.flight.requests.per.connection for all brokers and then blocks on
> send() until a request returns rather then building up a new batch. It's
> possible (I'd need to investigate the send loop further) the producer
> could create a new batch as soon as a request arrives, add a single
> record to it and immediately send it then block on send() again. This
> would result in the producer doing near to no batching and limiting it's
> throughput drastically.
>
> If this is the case, I figure we can allow the sticky partitioner to use
> all partitions if all brokers are at
> max.in.flight.requests.per.connection. In such a case it would add
> records to a single partition until a request completed or it hit
> batch.size and then picked a new partition at random.
>
> Feedback
> Before writing a KIP I'd love to hear peoples feedback, alternatives and
> concerns.
>
> Regards,
> Evelyn.
>
>
>