You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chad Preisler <ch...@gmail.com> on 2022/01/31 22:18:45 UTC

Re: Kafka Consumer Fairness when fetching events from different partitions.

Hello,

I got this from the JavaDocs for KafkaConsumer.

 * If a consumer is assigned multiple partitions to fetch data from, it
will try to consume from all of them at the same time,
 * effectively giving these partitions the same priority for consumption.
However in some cases consumers may want to
 * first focus on fetching from some subset of the assigned partitions at
full speed, and only start fetching other partitions
 * when these partitions have few or no data to consume.

*  One of such cases is stream processing, where processor fetches from two
topics and performs the join on these two streams.
 * When one of the topics is long lagging behind the other, the processor
would like to pause fetching from the ahead topic
 * in order to get the lagging stream to catch up. Another example is
bootstraping upon consumer starting up where there are
 * a lot of history data to catch up, the applications usually want to get
the latest data on some of the topics before consider
 * fetching other topics.

I'm testing a consumer now. When the topic being read has the following lag.

consumer group partition: 0, offset: 254, lag: 12301
consumer group partition: 1, offset: 302, lag: 12216
consumer group partition: 2, offset: 300, lag: 12257
consumer group partition: 3, offset: 259, lag: 12108

My consumer is starting with partition 3 and catching all the way up, then
it starts reading the rest of the partitions evenly. I'm not sure why it is
happening that way.

Hope this helps.





On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
mazen.ezzeddine@etu.univ-cotedazur.fr> wrote:

> Dear all,
>
> Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> P3 respectively. And let’s suppose that num.poll.records (the maximum
> number of records that can be fetched from the broker ) is equal to 100.
>
> If the consumer sends a request to fetch records from P1, P2, P3,  is
> there any guarantee that the returned records will be fairly/uniformly
> selected out of the available partitions e.g., say 34 records from P1, 33
> from P2 and 33 from P3.
>
> Otherwise, how the decision on the returned records is handled (e.g., is
> it based on the first partition  leader that replies to the fetch request
> e.g., say P1..). In such case how eventual fairness is guaranteed across
> different partitions,  in case for example when records happen to be
> fetched/read from a single partition.
>
> Thank you.
>
>

Re: Kafka Consumer Fairness when fetching events from different partitions.

Posted by Mazen Ezzeddine <ma...@etu.univ-cotedazur.fr>.
Hello Chen and Edward ,

I have went as well through the  documentation in https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP41:KafkaConsumerMaxRecords-EnsuringFairConsumption

And it turns out that the consumer polls in a greedy round-robin algorithm. Confirming perhaps what Chad has observed experimentally (within the same poll: prioritize fetching events from the same partition).

This means that considering the example in my initial question :  all 100 messages will be fetched from the first partition P1 on the first poll. On the second poll, the consumer will poll from P2 all the messages there, since there are 50 messages on that partition which is less than max.poll.records.  it will as  poll 50 more messages from P3.

So in the first poll priority will be given to P1 (up to max.poll.records out of P1), if P1 has less than max.poll.records events, only then events from P2 will be fetched and so on.
However, on the next poll priority will be given to the set of partitions not being visited (fetched from) in the last poll and so on.

I hope that someone from Confluent could confirm and/or correct/comment this thread. Indeed, random fetching or greedy polling with being inter-poll unfair (always prioritize the same partitions) could lead to starvation of some partitions. Hence, theoretically there might unbounded latency on some events, and no latency SLA can be granted (I am working on kafka event latency SLA research project ).

Thank you.

________________________________
From: Edward Capriolo <ed...@gmail.com>
Sent: Monday, January 31, 2022 11:28 PM
To: users@kafka.apache.org <us...@kafka.apache.org>
Subject: Re: Kafka Consumer Fairness when fetching events from different partitions.

On Monday, January 31, 2022, Chad Preisler <ch...@gmail.com> wrote:

> Hello,
>
> I got this from the JavaDocs for KafkaConsumer.
>
>  * If a consumer is assigned multiple partitions to fetch data from, it
> will try to consume from all of them at the same time,
>  * effectively giving these partitions the same priority for consumption.
> However in some cases consumers may want to
>  * first focus on fetching from some subset of the assigned partitions at
> full speed, and only start fetching other partitions
>  * when these partitions have few or no data to consume.
>
> *  One of such cases is stream processing, where processor fetches from two
> topics and performs the join on these two streams.
>  * When one of the topics is long lagging behind the other, the processor
> would like to pause fetching from the ahead topic
>  * in order to get the lagging stream to catch up. Another example is
> bootstraping upon consumer starting up where there are
>  * a lot of history data to catch up, the applications usually want to get
> the latest data on some of the topics before consider
>  * fetching other topics.
>
> I'm testing a consumer now. When the topic being read has the following
> lag.
>
> consumer group partition: 0, offset: 254, lag: 12301
> consumer group partition: 1, offset: 302, lag: 12216
> consumer group partition: 2, offset: 300, lag: 12257
> consumer group partition: 3, offset: 259, lag: 12108
>
> My consumer is starting with partition 3 and catching all the way up, then
> it starts reading the rest of the partitions evenly. I'm not sure why it is
> happening that way.
>
> Hope this helps.
>
>
>
>
>
> On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
> mazen.ezzeddine@etu.univ-cotedazur.fr> wrote:
>
> > Dear all,
> >
> > Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> > events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> > P3 respectively. And let’s suppose that num.poll.records (the maximum
> > number of records that can be fetched from the broker ) is equal to 100.
> >
> > If the consumer sends a request to fetch records from P1, P2, P3,  is
> > there any guarantee that the returned records will be fairly/uniformly
> > selected out of the available partitions e.g., say 34 records from P1, 33
> > from P2 and 33 from P3.
> >
> > Otherwise, how the decision on the returned records is handled (e.g., is
> > it based on the first partition  leader that replies to the fetch request
> > e.g., say P1..). In such case how eventual fairness is guaranteed across
> > different partitions,  in case for example when records happen to be
> > fetched/read from a single partition.
> >
> > Thank you.
> >
> >
>

What I have noticed anecdotally. The order is random. Two consumers reading
the same messages from the same group will get messages in different orders.

Also if you get backlogged and partitions have depth  you tend to get all
the data from a partition before it moves onto the next. But this behavior
is likely very version and client dependent.

The order you consume shouldn't matter but in practice everything matters
at least a little evit to someone.


--
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.

Re: Kafka Consumer Fairness when fetching events from different partitions.

Posted by Edward Capriolo <ed...@gmail.com>.
On Monday, January 31, 2022, Chad Preisler <ch...@gmail.com> wrote:

> Hello,
>
> I got this from the JavaDocs for KafkaConsumer.
>
>  * If a consumer is assigned multiple partitions to fetch data from, it
> will try to consume from all of them at the same time,
>  * effectively giving these partitions the same priority for consumption.
> However in some cases consumers may want to
>  * first focus on fetching from some subset of the assigned partitions at
> full speed, and only start fetching other partitions
>  * when these partitions have few or no data to consume.
>
> *  One of such cases is stream processing, where processor fetches from two
> topics and performs the join on these two streams.
>  * When one of the topics is long lagging behind the other, the processor
> would like to pause fetching from the ahead topic
>  * in order to get the lagging stream to catch up. Another example is
> bootstraping upon consumer starting up where there are
>  * a lot of history data to catch up, the applications usually want to get
> the latest data on some of the topics before consider
>  * fetching other topics.
>
> I'm testing a consumer now. When the topic being read has the following
> lag.
>
> consumer group partition: 0, offset: 254, lag: 12301
> consumer group partition: 1, offset: 302, lag: 12216
> consumer group partition: 2, offset: 300, lag: 12257
> consumer group partition: 3, offset: 259, lag: 12108
>
> My consumer is starting with partition 3 and catching all the way up, then
> it starts reading the rest of the partitions evenly. I'm not sure why it is
> happening that way.
>
> Hope this helps.
>
>
>
>
>
> On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
> mazen.ezzeddine@etu.univ-cotedazur.fr> wrote:
>
> > Dear all,
> >
> > Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> > events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> > P3 respectively. And let’s suppose that num.poll.records (the maximum
> > number of records that can be fetched from the broker ) is equal to 100.
> >
> > If the consumer sends a request to fetch records from P1, P2, P3,  is
> > there any guarantee that the returned records will be fairly/uniformly
> > selected out of the available partitions e.g., say 34 records from P1, 33
> > from P2 and 33 from P3.
> >
> > Otherwise, how the decision on the returned records is handled (e.g., is
> > it based on the first partition  leader that replies to the fetch request
> > e.g., say P1..). In such case how eventual fairness is guaranteed across
> > different partitions,  in case for example when records happen to be
> > fetched/read from a single partition.
> >
> > Thank you.
> >
> >
>

What I have noticed anecdotally. The order is random. Two consumers reading
the same messages from the same group will get messages in different orders.

Also if you get backlogged and partitions have depth  you tend to get all
the data from a partition before it moves onto the next. But this behavior
is likely very version and client dependent.

The order you consume shouldn't matter but in practice everything matters
at least a little evit to someone.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.