You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by ChienHsing Wu <ch...@opentext.com> on 2018/11/20 19:45:36 UTC

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Matt,

Thanks for the feedback. 

The issue with the current design is that it stays on the previous partition even if the last poll call consumes the max.poll.records; it will consume all records in that partition available at the consumer side to serve multiple poll calls before moving to the next partition. 

Introducing another threshold at partition level will decrease the number of records consumed in one partition within one poll call but will still use that same partition as the starting one in the next poll call. 

The same effect can be achieved by setting max.poll.records to 100 I believe. The main difference is that the client will need to make more poll calls when that value is set to 100, and because of the non-blocking nature I believe the cost of extra poll calls are not significant. 

Further thoughts?

Thanks, CH

-----Original Message-----
From: Matt Farmer <ma...@frmr.me> 
Sent: Monday, November 19, 2018 9:32 PM
To: dev@kafka.apache.org
Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi there,

Thanks for the KIP.

We’ve run into issues with this at Mailchimp so something to address consuming behavior would save us from having to always ensure we’re running enough consumers that each consumer has only one partition (which is our usual MO).

I wonder though if it would be simpler and more powerful to define the maximum number of records the consumer should pull from one partition before pulling some records from another?

So if you set max.poll.records to 500 and then some new setting, max.poll.records.per.partition, to 100 then the Consumer would switch what partition it reads from every 100 records - looping back around to the first partition that had records if there aren’t 5 or more partitions with records.

What do you think?

On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Hi, could anyone please review this KIP?
>
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Friday, November 09, 2018 1:10 PM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across 
> Partitions in KafkaConsumer
>
> Just to check: Will anyone review this? It's been silent for a week...
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Monday, November 05, 2018 4:18 PM
> To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> dev@kafka.apache.org>>
> Subject: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions 
> in KafkaConsumer
>
> Hi I just put together the KIP page as requested. This email is to 
> start the discussion thread.
>
> KIP: KIP-387: Fair Message Consumption Across Partitions in 
> KafkaConsumer< 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
> -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD
> AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioE
> m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1bF5
> 49_KU&e=
> >
> Pull Request: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache
> _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OF
> yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7L
> X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> Jira: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
>
> Thanks, CH
>

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
I beg to differ... The original KIP intended to consume messages fairly and the current implementation can be improved with the patch that can benefit the community. I believe that fair consumption is an important characteristic at the consumer side. But anyhow, looks like this is not received well enough.

--CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org> 
Sent: Tuesday, January 29, 2019 4:00 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

On Mon, Jan 28, 2019, at 06:57, ChienHsing Wu wrote:
> So... Does this non-response mean I should drop this topic after 
> almost one month, folks?

Hi ChienHsing,

I would recommend dropping it, since I don't see a lot of uses for it.  Maybe there is something I missed, though.  See my responses below:

> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Monday, January 21, 2019 12:47 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi all, not sure what to do next as weeks have gone by, guys. --CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Monday, January 14, 2019 9:13 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi,
> 
> I know everyone is busy. But I would appreciate someone letting me 
> know what to do next. I started this effort back in last year early 
> November...
> 
> Thanks, CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Monday, January 07, 2019 9:24 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi guys,
> 
> I am not sure what to do next in this KIP process. Could anyone please 
> help/advise me on what to do next?
> 
> Thanks, CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, January 02, 2019 12:55 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi Colin,
> 
> Setting max.partition.fetch.bytes was discussed in the ticket. It's 
> not as desirable if the message size is highly variable. Also this 
> decrease the efficiency of network communication.

Even if the message size is highly variable, you can still set the max.partition.fetch.bytes to a very small value., perhaps even 1.  Since we always return at least one record, regardless of size limits, this will have the effect you want.  I agree that this may be less efficient than fetching a larger number of records.  But it will get you true round-robin behavior.

> 
> In the case you mentioned below where a consumer can get messages from 
> A, B, C and D but the consumer currently only has messages from A, B 
> and C, the proposed change will NOT wait until some messages from D 
> arrives to start returning messages; it will just serve those from A, 
> B and  It will include those from D when they are available. That IS 
> the current behavior. The proposed change does not impose a strict 
> round robin pattern.

I guess this is debatable, but the behavior you're proposing doesn't seem "fairer" than what we do now.  Imagine you are subscribed to 100 partitions and each fetch request only gives you back records from 3 of them.  Then the order the consumer sees records might be something like:

ABCABCABCDEFDEFDEFDEFGHIGHIGHI... etc.

Is this fairer than AAABBBCCCDDDEEEFFFGGGHHHIII?  Seems questionable.

It feels like maybe what you really want is a work queue for A, B, C, D, etc. so that messages from different partitions can be processed in parallel.  And then perhaps pause fetching a partition when the work queue for that partition grows too long.

> 
> The original KIP 41 discussed "Ensuring Fair Consumption", that means 
> it originally intended to take that into account in the Consumer code, 
> the proposed change takes the current algorithm closer to that goal, 
> IMHO. I could implement that logic at the caller side but, that would 
> mean each library user need to know the inner working of the consumer 
> code and to implement the logic on their own. Though as a first timer 
> here, I do appreciate the complexity and functionalities in the client 
> library and feel that we'd be better off as a community to implement 
> the logic in the library so the complexity is hidden from library users.

The discussion in KIP-41 in the "ensuring fair consumption" section is about making sure that no partitions get starved forever.  This would happen if, for example, we just constantly fetched from a single partition and never fetched from some other partition.  The discussion in that KIP isn't about interleaving the partition order of the buffered records we return to the consumer.

best,
Colin


> 
> Thanks, CH
> 
> -----Original Message-----
> From: Colin McCabe <cm...@apache.org>
> Sent: Saturday, December 22, 2018 3:53 AM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing Wu,
> 
> Maybe I'm misunderstanding something, but I'm not sure I see the need 
> for a KIP here.  You can just set max.partition.fetch.bytes to a very 
> small value.  That will cause Kafka to fetch only one message from 
> each partition.  This will give you the round robin behavior you want.
> 
> Alternately, if you don't want to change max.partition.fetch.bytes, 
> you could do your own buffering to get round robin behavior.  Keep a 
> buffer of messages from partition A, B, C, and D and hold back the 
> messages from A, B, and C until one from D arrives, so that the A B C 
> D A B C D... etc. order always repeats.
> 
> best,
> Colin
> 
> 
> On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> > Looking back the email thread I think one of the comments from 
> > Mayuresh was the question about needing KIP for this change or not 
> > as the KafkaConsumer does not guarantee the end user any order, and 
> > so no changes to the contracts to users.
> > 
> > I entered KIP based on suggestions from the attached email when 
> > going through code contribution process. I am not sure what to do 
> > next in this KIP process. Could anyone please help/advise me on what to do next?
> > 
> > Thanks!
> > 
> > CH
> > 
> > -----Original Message-----
> > From: ChienHsing Wu <ch...@opentext.com>
> > Sent: Wednesday, December 12, 2018 1:05 PM
> > To: dev@kafka.apache.org
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> > 
> > Good to know that, Thanks! 
> > 
> > Nonetheless, that introduces additional complexity at the client 
> > side for a common expectation to more or less receives records in a 
> > fair fashion.
> > 
> > CH
> > 
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Wednesday, December 12, 2018 12:55 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> > 
> > Hi ChienHsing,
> > 
> > We are actually working on buffering the already fetched data for 
> > paused topicPartitions, so ideally it should not have any effect on 
> > performance.
> > Associated jira : 
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.o
> > rg 
> > _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wM
> > rb
> > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8
> > y5 uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> > 
> > Thanks,
> > 
> > Mayuresh
> > 
> > On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> > 
> > > Hi Mayuresh,
> > >
> > > Thanks for the input!
> > >
> > > Pausing and Resuming are cumbersome and has some undesirable 
> > > performance impact since pausing will in effect clean up the 
> > > completed fetch and resuming will call the broker to retrieve again.
> > >
> > > The way I changed the code was just to parse the completed fetch 
> > > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > > I did make code changes to take into account the following in Fetcher class.
> > >
> > > 1) exception handling
> > > 2) ensure the parsed partitions are not included in 
> > > fetchablePartitions
> > > 3) clear buffer when not in the newly assigned partitions in 
> > > clearBufferedDataForUnassignedPartitions
> > > 4) close them properly in close method
> > >
> > > Though the consumer does not guarantee explicit order, KIP 41 
> > > (link
> > > below) did intend to ensure fair distribution and therefore the 
> > > round robin algorithm in the code. The change I propose was to enhance it.
> > >
> > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWl
> > > F_
> > > yg
> > > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> > >
> > > As for performance, the changes does not add any additional calls 
> > > to the broker nor does it introduce significant processing logic; 
> > > it just parses the completed fetch earlier and have a list to manage them.
> > >
> > >
> > > CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 6:58 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > The other way I was thinking, this can be done outside of 
> > > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > > There is some gotcha there as in you might not know if the 
> > > consumer has already fetched data for the remaining partitions.
> > > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > > does not guarantee the end user, any order, I believe. So if this 
> > > change goes in, I don't think its changing the underlying behavior.
> > > It would be good to check if this change will impact the 
> > > performance of the consumer.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > To serve one poll call the logic greedily gets records from one 
> > > > completed fetch before including records from the next completed 
> > > > fetch from the queue, as you described.
> > > >
> > > > The algorithm remembers the current completed fetch as starting 
> > > > one when serving the next poll call. The net effect is that 
> > > > completed fetch will be retrieved to serve as many poll calls 
> > > > before retrieving records from any other completed fetches.
> > > >
> > > > For example, let's say the consumer has been assigned partition 
> > > > A, B and C and the max.poll.records is set to 100. Right now we 
> > > > have completed fetch A, and B. Each one has 300 records. It will 
> > > > take 6 poll calls to retrieve all record and the sequence of 
> > > > retrieved partitions will be: A, A, A, B, B, B.
> > > >
> > > > Ideally, it should alternate between A and B. I was proposing to 
> > > > move to the next one fetch for the next poll call based on the 
> > > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > > The implementation parses the completed fetch only once.
> > > >
> > > > Thanks, CH
> > > >
> > > > -----Original Message-----
> > > > From: Mayuresh Gharat <gh...@gmail.com>
> > > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > > Hi ChienHsing,
> > > >
> > > > Thanks for the KIP.
> > > > It would be great if you can explain with an example, what you mean by "
> > > > Currently the implementation will return available records 
> > > > starting from the last partition the last poll call retrieves records from.
> > > > This leads to unfair patterns of record consumption from 
> > > > multiple
> > > partitions."
> > > >
> > > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > > then gets the corresponding responses and puts them in to a 
> > > > single queue of CompletedFetches. IT then iterates over these 
> > > > completed fetches queue and peels of number of records = 
> > > > max.poll.records from each completedFetch for each poll() before 
> > > > moving on to next completedFetch. Also it does not send a fetch 
> > > > request for a TopicPartition, if we already have a buffered data 
> > > > (completedFetch or
> > > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > > TopicPartition to the end of the assignment queue, once it has 
> > > > received data from broker for that TopicPartition, to maintain 
> > > > round
> > > robin fetch sequence for fairness.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > > <ch...@opentext.com>
> > > > wrote:
> > > >
> > > > > Jason,
> > > > >
> > > > >
> > > > >
> > > > > KIP 41 was initiated by you and this KIP is to change the 
> > > > > logic discussed in the Ensure Fair Consumption<
> > > > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BR
> > > > ec
> > > > or
> > > > ds
> > > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&
> > > > d=
> > > > Dw
> > > > IF
> > > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLm
> > > > mg
> > > > 6R
> > > > A3
> > > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvh
> > > > dQ
> > > > hn
> > > > -o
> > > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > > >.
> > > > > Your input on KIP-387<
> > > > >
> > > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apac
> > > > >he
> > > > >.o
> > > > >rg
> > > > >_
> > > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCons
> > > > >um
> > > > >pt
> > > > >io
> > > > >n
> > > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3m
> > > > >f2
> > > > >P1
> > > > >-X
> > > > >D
> > > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrR
> > > > >eh
> > > > >ja
> > > > >ys
> > > > >S
> > > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHN
> > > > >ln
> > > > >Nf
> > > > >0k
> > > > >T
> > > > > tRlgk&e=>
> > > > > would be very valuable.
> > > > >
> > > > >
> > > > >
> > > > > Thanks, ChienHsing
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > Any comments/updates? I am not sure the next steps if no one 
> > > > > has any further comments.
> > > > >
> > > > >
> > > > >
> > > > > Thanks, CH
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > >
> > > > > From: ChienHsing Wu
> > > > > <ch...@opentext.com>>
> > > > >
> > > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi Matt,
> > > > >
> > > > >
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > >
> > > > >
> > > > > The issue with the current design is that it stays on the 
> > > > > previous partition even if the last poll call consumes the 
> > > > > max.poll.records; it will consume all records in that 
> > > > > partition available at the consumer side to serve multiple 
> > > > > poll calls before moving to the next
> > > partition.
> > > > >
> > > > >
> > > > >
> > > > > Introducing another threshold at partition level will decrease 
> > > > > the number of records consumed in one partition within one 
> > > > > poll call but will still use that same partition as the 
> > > > > starting one in the next poll
> > > > call.
> > > > >
> > > > >
> > > > >
> > > > > The same effect can be achieved by setting max.poll.records to
> > > > > 100 I believe. The main difference is that the client will 
> > > > > need to make more poll calls when that value is set to 100, 
> > > > > and because of the non-blocking nature I believe the cost of 
> > > > > extra poll calls are not
> > > > significant.
> > > > >
> > > > >
> > > > >
> > > > > Further thoughts?
> > > > >
> > > > >
> > > > >
> > > > > Thanks, CH
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > >
> > > > > From: Matt Farmer <ma...@frmr.me>>
> > > > >
> > > > > Sent: Monday, November 19, 2018 9:32 PM
> > > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi there,
> > > > >
> > > > >
> > > > >
> > > > > Thanks for the KIP.
> > > > >
> > > > >
> > > > >
> > > > > We’ve run into issues with this at Mailchimp so something to 
> > > > > address consuming behavior would save us from having to always 
> > > > > ensure we’re running enough consumers that each consumer has 
> > > > > only one partition (which is our usual MO).
> > > > >
> > > > >
> > > > >
> > > > > I wonder though if it would be simpler and more powerful to 
> > > > > define the maximum number of records the consumer should pull 
> > > > > from one partition before pulling some records from another?
> > > > >
> > > > >
> > > > >
> > > > > So if you set max.poll.records to 500 and then some new 
> > > > > setting, max.poll.records.per.partition, to 100 then the 
> > > > > Consumer would switch what partition it reads from every 100 
> > > > > records - looping back around to the first partition that had 
> > > > > records if there aren’t 5 or more partitions with records.
> > > > >
> > > > >
> > > > >
> > > > > What do you think?
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > > >
> > > > >
> > > > >
> > > > > > Hi, could anyone please review this KIP?
> > > > >
> > > > > >
> > > > >
> > > > > > Thanks, ChienHsing
> > > > >
> > > > > >
> > > > >
> > > > > > From: ChienHsing Wu
> > > > >
> > > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > > >
> > > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > > Across
> > > > >
> > > > > > Partitions in KafkaConsumer
> > > > >
> > > > > >
> > > > >
> > > > > > Just to check: Will anyone review this? It's been silent for 
> > > > > > a
> > > week...
> > > > >
> > > > > > Thanks, ChienHsing
> > > > >
> > > > > >
> > > > >
> > > > > > From: ChienHsing Wu
> > > > >
> > > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > > >
> > > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > > >
> > > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > > >
> > > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > > Partitions
> > > > >
> > > > > > in KafkaConsumer
> > > > >
> > > > > >
> > > > >
> > > > > > Hi I just put together the KIP page as requested. This email 
> > > > > > is to
> > > > >
> > > > > > start the discussion thread.
> > > > >
> > > > > >
> > > > >
> > > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > > >
> > > > > > KafkaConsumer<
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > > or
> > > > > > g_
> > > > >
> > > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BC
> > > > > > on
> > > > > > su
> > > > > > mp
> > > > > > ti
> > > > > > on
> > > > >
> > > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRm
> > > > > > m3
> > > > > > mf
> > > > > > 2P
> > > > > > 1-
> > > > > > XD
> > > > >
> > > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHP
> > > > > > jf
> > > > > > cr
> > > > > > k8
> > > > > > Xi
> > > > > > oE
> > > > >
> > > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwc
> > > > > > KZ
> > > > > > _l
> > > > > > NP
> > > > > > 1b
> > > > > > F5
> > > > >
> > > > > > 49_KU&e=
> > > > >
> > > > > > >
> > > > >
> > > > > > Pull Request:
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> > > > > > co
> > > > > > m_
> > > > > > ap
> > > > > > ac
> > > > > > he
> > > > >
> > > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMr
> > > > > > bL
> > > > > > 9T
> > > > > > oL
> > > > > > W0
> > > > > > OF
> > > > >
> > > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Yr
> > > > > > zV
> > > > > > rp
> > > > > > 5t
> > > > > > BK
> > > > > > 7L
> > > > >
> > > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > > >
> > > > > > Jira:
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.
> > > > > > ap
> > > > > > ac
> > > > > > he
> > > > > > .o
> > > > > > rg
> > > > >
> > > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&
> > > > > > r=
> > > > > > Az
> > > > > > 03
> > > > > > wM
> > > > > > rb
> > > > >
> > > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UI
> > > > > > KY
> > > > > > wi
> > > > > > 8c
> > > > > > 8Y
> > > > > > rz
> > > > >
> > > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > > >
> > > > > >
> > > > >
> > > > > > Thanks, CH
> > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> > 
> > 
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> > Email had 1 attachment:
> > + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> > + round
> > robin fashion
> >   15k (message/rfc822)
>

Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by Colin McCabe <cm...@apache.org>.
On Mon, Jan 28, 2019, at 06:57, ChienHsing Wu wrote:
> So... Does this non-response mean I should drop this topic after almost 
> one month, folks?

Hi ChienHsing,

I would recommend dropping it, since I don't see a lot of uses for it.  Maybe there is something I missed, though.  See my responses below:

> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com> 
> Sent: Monday, January 21, 2019 12:47 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi all, not sure what to do next as weeks have gone by, guys. --CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Monday, January 14, 2019 9:13 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi,
> 
> I know everyone is busy. But I would appreciate someone letting me know 
> what to do next. I started this effort back in last year early 
> November...
> 
> Thanks, CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Monday, January 07, 2019 9:24 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi guys,
> 
> I am not sure what to do next in this KIP process. Could anyone please 
> help/advise me on what to do next? 
> 
> Thanks, CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, January 02, 2019 12:55 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi Colin,
> 
> Setting max.partition.fetch.bytes was discussed in the ticket. It's not 
> as desirable if the message size is highly variable. Also this decrease 
> the efficiency of network communication. 

Even if the message size is highly variable, you can still set the max.partition.fetch.bytes to a very small value., perhaps even 1.  Since we always return at least one record, regardless of size limits, this will have the effect you want.  I agree that this may be less efficient than fetching a larger number of records.  But it will get you true round-robin behavior.

> 
> In the case you mentioned below where a consumer can get messages from 
> A, B, C and D but the consumer currently only has messages from A, B 
> and C, the proposed change will NOT wait until some messages from D 
> arrives to start returning messages; it will just serve those from A, B 
> and  It will include those from D when they are available. That IS 
> the current behavior. The proposed change does not impose a strict 
> round robin pattern.

I guess this is debatable, but the behavior you're proposing doesn't seem "fairer" than what we do now.  Imagine you are subscribed to 100 partitions and each fetch request only gives you back records from 3 of them.  Then the order the consumer sees records might be something like:

ABCABCABCDEFDEFDEFDEFGHIGHIGHI... etc.

Is this fairer than AAABBBCCCDDDEEEFFFGGGHHHIII?  Seems questionable.

It feels like maybe what you really want is a work queue for A, B, C, D, etc. so that messages from different partitions can be processed in parallel.  And then perhaps pause fetching a partition when the work queue for that partition grows too long.

> 
> The original KIP 41 discussed "Ensuring Fair Consumption", that means 
> it originally intended to take that into account in the Consumer code, 
> the proposed change takes the current algorithm closer to that goal, 
> IMHO. I could implement that logic at the caller side but, that would 
> mean each library user need to know the inner working of the consumer 
> code and to implement the logic on their own. Though as a first timer 
> here, I do appreciate the complexity and functionalities in the client 
> library and feel that we'd be better off as a community to implement 
> the logic in the library so the complexity is hidden from library users.

The discussion in KIP-41 in the "ensuring fair consumption" section is about making sure that no partitions get starved forever.  This would happen if, for example, we just constantly fetched from a single partition and never fetched from some other partition.  The discussion in that KIP isn't about interleaving the partition order of the buffered records we return to the consumer.

best,
Colin


> 
> Thanks, CH
> 
> -----Original Message-----
> From: Colin McCabe <cm...@apache.org>
> Sent: Saturday, December 22, 2018 3:53 AM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing Wu,
> 
> Maybe I'm misunderstanding something, but I'm not sure I see the need 
> for a KIP here.  You can just set max.partition.fetch.bytes to a very 
> small value.  That will cause Kafka to fetch only one message from each 
> partition.  This will give you the round robin behavior you want.
> 
> Alternately, if you don't want to change max.partition.fetch.bytes, you 
> could do your own buffering to get round robin behavior.  Keep a buffer 
> of messages from partition A, B, C, and D and hold back the messages 
> from A, B, and C until one from D arrives, so that the A B C D A B C 
> D... etc. order always repeats.
> 
> best,
> Colin
> 
> 
> On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> > Looking back the email thread I think one of the comments from 
> > Mayuresh was the question about needing KIP for this change or not as 
> > the KafkaConsumer does not guarantee the end user any order, and so no 
> > changes to the contracts to users.
> > 
> > I entered KIP based on suggestions from the attached email when going 
> > through code contribution process. I am not sure what to do next in 
> > this KIP process. Could anyone please help/advise me on what to do next?
> > 
> > Thanks!
> > 
> > CH
> > 
> > -----Original Message-----
> > From: ChienHsing Wu <ch...@opentext.com>
> > Sent: Wednesday, December 12, 2018 1:05 PM
> > To: dev@kafka.apache.org
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> > 
> > Good to know that, Thanks! 
> > 
> > Nonetheless, that introduces additional complexity at the client side 
> > for a common expectation to more or less receives records in a fair 
> > fashion.
> > 
> > CH
> > 
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Wednesday, December 12, 2018 12:55 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> > 
> > Hi ChienHsing,
> > 
> > We are actually working on buffering the already fetched data for 
> > paused topicPartitions, so ideally it should not have any effect on 
> > performance.
> > Associated jira : 
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> > _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> > uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> > 
> > Thanks,
> > 
> > Mayuresh
> > 
> > On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> > 
> > > Hi Mayuresh,
> > >
> > > Thanks for the input!
> > >
> > > Pausing and Resuming are cumbersome and has some undesirable 
> > > performance impact since pausing will in effect clean up the 
> > > completed fetch and resuming will call the broker to retrieve again.
> > >
> > > The way I changed the code was just to parse the completed fetch 
> > > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > > I did make code changes to take into account the following in Fetcher class.
> > >
> > > 1) exception handling
> > > 2) ensure the parsed partitions are not included in 
> > > fetchablePartitions
> > > 3) clear buffer when not in the newly assigned partitions in 
> > > clearBufferedDataForUnassignedPartitions
> > > 4) close them properly in close method
> > >
> > > Though the consumer does not guarantee explicit order, KIP 41 (link
> > > below) did intend to ensure fair distribution and therefore the 
> > > round robin algorithm in the code. The change I propose was to enhance it.
> > >
> > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > > A3
> > > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > > yg
> > > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> > >
> > > As for performance, the changes does not add any additional calls to 
> > > the broker nor does it introduce significant processing logic; it 
> > > just parses the completed fetch earlier and have a list to manage them.
> > >
> > >
> > > CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 6:58 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > The other way I was thinking, this can be done outside of 
> > > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > > There is some gotcha there as in you might not know if the consumer 
> > > has already fetched data for the remaining partitions.
> > > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > > does not guarantee the end user, any order, I believe. So if this 
> > > change goes in, I don't think its changing the underlying behavior.
> > > It would be good to check if this change will impact the performance 
> > > of the consumer.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > To serve one poll call the logic greedily gets records from one 
> > > > completed fetch before including records from the next completed 
> > > > fetch from the queue, as you described.
> > > >
> > > > The algorithm remembers the current completed fetch as starting 
> > > > one when serving the next poll call. The net effect is that 
> > > > completed fetch will be retrieved to serve as many poll calls 
> > > > before retrieving records from any other completed fetches.
> > > >
> > > > For example, let's say the consumer has been assigned partition A, 
> > > > B and C and the max.poll.records is set to 100. Right now we have 
> > > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > > poll calls to retrieve all record and the sequence of retrieved 
> > > > partitions will be: A, A, A, B, B, B.
> > > >
> > > > Ideally, it should alternate between A and B. I was proposing to 
> > > > move to the next one fetch for the next poll call based on the 
> > > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > > The implementation parses the completed fetch only once.
> > > >
> > > > Thanks, CH
> > > >
> > > > -----Original Message-----
> > > > From: Mayuresh Gharat <gh...@gmail.com>
> > > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > > Hi ChienHsing,
> > > >
> > > > Thanks for the KIP.
> > > > It would be great if you can explain with an example, what you mean by "
> > > > Currently the implementation will return available records 
> > > > starting from the last partition the last poll call retrieves records from.
> > > > This leads to unfair patterns of record consumption from multiple
> > > partitions."
> > > >
> > > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > > then gets the corresponding responses and puts them in to a single 
> > > > queue of CompletedFetches. IT then iterates over these completed 
> > > > fetches queue and peels of number of records = max.poll.records 
> > > > from each completedFetch for each poll() before moving on to next 
> > > > completedFetch. Also it does not send a fetch request for a 
> > > > TopicPartition, if we already have a buffered data (completedFetch 
> > > > or
> > > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > > TopicPartition to the end of the assignment queue, once it has 
> > > > received data from broker for that TopicPartition, to maintain 
> > > > round
> > > robin fetch sequence for fairness.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > > <ch...@opentext.com>
> > > > wrote:
> > > >
> > > > > Jason,
> > > > >
> > > > >
> > > > >
> > > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > > discussed in the Ensure Fair Consumption<
> > > > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > > or
> > > > ds
> > > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > > Dw
> > > > IF
> > > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > > 6R
> > > > A3
> > > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > > hn
> > > > -o
> > > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > > >.
> > > > > Your input on KIP-387<
> > > > >
> > > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > > >.o
> > > > >rg
> > > > >_
> > > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > > >pt
> > > > >io
> > > > >n
> > > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > > >P1
> > > > >-X
> > > > >D
> > > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > > >ja
> > > > >ys
> > > > >S
> > > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > > >Nf
> > > > >0k
> > > > >T
> > > > > tRlgk&e=>
> > > > > would be very valuable.
> > > > >
> > > > >
> > > > >
> > > > > Thanks, ChienHsing
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > > To: dev@kafka.apache.org
> > > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > > any further comments.
> > > > >
> > > > >
> > > > >
> > > > > Thanks, CH
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > >
> > > > > From: ChienHsing Wu
> > > > > <ch...@opentext.com>>
> > > > >
> > > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi Matt,
> > > > >
> > > > >
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > >
> > > > >
> > > > > The issue with the current design is that it stays on the 
> > > > > previous partition even if the last poll call consumes the 
> > > > > max.poll.records; it will consume all records in that partition 
> > > > > available at the consumer side to serve multiple poll calls 
> > > > > before moving to the next
> > > partition.
> > > > >
> > > > >
> > > > >
> > > > > Introducing another threshold at partition level will decrease 
> > > > > the number of records consumed in one partition within one poll 
> > > > > call but will still use that same partition as the starting one 
> > > > > in the next poll
> > > > call.
> > > > >
> > > > >
> > > > >
> > > > > The same effect can be achieved by setting max.poll.records to
> > > > > 100 I believe. The main difference is that the client will need 
> > > > > to make more poll calls when that value is set to 100, and 
> > > > > because of the non-blocking nature I believe the cost of extra 
> > > > > poll calls are not
> > > > significant.
> > > > >
> > > > >
> > > > >
> > > > > Further thoughts?
> > > > >
> > > > >
> > > > >
> > > > > Thanks, CH
> > > > >
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > >
> > > > > From: Matt Farmer <ma...@frmr.me>>
> > > > >
> > > > > Sent: Monday, November 19, 2018 9:32 PM
> > > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > > Consumption Across Partitions in KafkaConsumer
> > > > >
> > > > >
> > > > >
> > > > > Hi there,
> > > > >
> > > > >
> > > > >
> > > > > Thanks for the KIP.
> > > > >
> > > > >
> > > > >
> > > > > We’ve run into issues with this at Mailchimp so something to 
> > > > > address consuming behavior would save us from having to always 
> > > > > ensure we’re running enough consumers that each consumer has 
> > > > > only one partition (which is our usual MO).
> > > > >
> > > > >
> > > > >
> > > > > I wonder though if it would be simpler and more powerful to 
> > > > > define the maximum number of records the consumer should pull 
> > > > > from one partition before pulling some records from another?
> > > > >
> > > > >
> > > > >
> > > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > > switch what partition it reads from every 100 records - looping 
> > > > > back around to the first partition that had records if there 
> > > > > aren’t 5 or more partitions with records.
> > > > >
> > > > >
> > > > >
> > > > > What do you think?
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > > >
> > > > >
> > > > >
> > > > > > Hi, could anyone please review this KIP?
> > > > >
> > > > > >
> > > > >
> > > > > > Thanks, ChienHsing
> > > > >
> > > > > >
> > > > >
> > > > > > From: ChienHsing Wu
> > > > >
> > > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > > >
> > > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > > >
> > > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > > Across
> > > > >
> > > > > > Partitions in KafkaConsumer
> > > > >
> > > > > >
> > > > >
> > > > > > Just to check: Will anyone review this? It's been silent for a
> > > week...
> > > > >
> > > > > > Thanks, ChienHsing
> > > > >
> > > > > >
> > > > >
> > > > > > From: ChienHsing Wu
> > > > >
> > > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > > >
> > > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > > >
> > > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > > >
> > > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > > Partitions
> > > > >
> > > > > > in KafkaConsumer
> > > > >
> > > > > >
> > > > >
> > > > > > Hi I just put together the KIP page as requested. This email 
> > > > > > is to
> > > > >
> > > > > > start the discussion thread.
> > > > >
> > > > > >
> > > > >
> > > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > > >
> > > > > > KafkaConsumer<
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > > or
> > > > > > g_
> > > > >
> > > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > > su
> > > > > > mp
> > > > > > ti
> > > > > > on
> > > > >
> > > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > > mf
> > > > > > 2P
> > > > > > 1-
> > > > > > XD
> > > > >
> > > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > > cr
> > > > > > k8
> > > > > > Xi
> > > > > > oE
> > > > >
> > > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > > _l
> > > > > > NP
> > > > > > 1b
> > > > > > F5
> > > > >
> > > > > > 49_KU&e=
> > > > >
> > > > > > >
> > > > >
> > > > > > Pull Request:
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > > m_
> > > > > > ap
> > > > > > ac
> > > > > > he
> > > > >
> > > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > > 9T
> > > > > > oL
> > > > > > W0
> > > > > > OF
> > > > >
> > > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > > rp
> > > > > > 5t
> > > > > > BK
> > > > > > 7L
> > > > >
> > > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > > >
> > > > > > Jira:
> > > > >
> > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > > ac
> > > > > > he
> > > > > > .o
> > > > > > rg
> > > > >
> > > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > > Az
> > > > > > 03
> > > > > > wM
> > > > > > rb
> > > > >
> > > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > > wi
> > > > > > 8c
> > > > > > 8Y
> > > > > > rz
> > > > >
> > > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > > >
> > > > > >
> > > > >
> > > > > > Thanks, CH
> > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> > 
> > 
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> > Email had 1 attachment:
> > + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> > + round
> > robin fashion
> >   15k (message/rfc822)
>

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
So... Does this non-response mean I should drop this topic after almost one month, folks?

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Monday, January 21, 2019 12:47 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi all, not sure what to do next as weeks have gone by, guys. --CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Monday, January 14, 2019 9:13 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi,

I know everyone is busy. But I would appreciate someone letting me know what to do next. I started this effort back in last year early November...

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Monday, January 07, 2019 9:24 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi guys,

I am not sure what to do next in this KIP process. Could anyone please help/advise me on what to do next? 

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as desirable if the message size is highly variable. Also this decrease the efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C and D but the consumer currently only has messages from A, B and C, the proposed change will NOT wait until some messages from D arrives to start returning messages; it will just serve those from A, B and C. It will include those from D when they are available. That IS the current behavior. The proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it originally intended to take that into account in the Consumer code, the proposed change takes the current algorithm closer to that goal, IMHO. I could implement that logic at the caller side but, that would mean each library user need to know the inner working of the consumer code and to implement the logic on their own. Though as a first timer here, I do appreciate the complexity and functionalities in the client library and feel that we'd be better off as a community to implement the logic in the library so the complexity is hidden from library users.

Thanks, CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org>
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the 
> > round robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it 
> > just parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting 
> > > one when serving the next poll call. The net effect is that 
> > > completed fetch will be retrieved to serve as many poll calls 
> > > before retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, 
> > > B and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the 
> > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records 
> > > starting from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > then gets the corresponding responses and puts them in to a single 
> > > queue of CompletedFetches. IT then iterates over these completed 
> > > fetches queue and peels of number of records = max.poll.records 
> > > from each completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain 
> > > round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > hn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > >.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > >pt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > >P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > >ja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > >Nf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the 
> > > > previous partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls 
> > > > before moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease 
> > > > the number of records consumed in one partition within one poll 
> > > > call but will still use that same partition as the starting one 
> > > > in the next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to
> > > > 100 I believe. The main difference is that the client will need 
> > > > to make more poll calls when that value is set to 100, and 
> > > > because of the non-blocking nature I believe the cost of extra 
> > > > poll calls are not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has 
> > > > only one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to 
> > > > define the maximum number of records the consumer should pull 
> > > > from one partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email 
> > > > > is to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > su
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > cr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > _l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > m_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > 9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > rp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > ac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > wi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> + round
> robin fashion
>   15k (message/rfc822)

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi all, not sure what to do next as weeks have gone by, guys. --CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Monday, January 14, 2019 9:13 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi,

I know everyone is busy. But I would appreciate someone letting me know what to do next. I started this effort back in last year early November...

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Monday, January 07, 2019 9:24 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi guys,

I am not sure what to do next in this KIP process. Could anyone please help/advise me on what to do next? 

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as desirable if the message size is highly variable. Also this decrease the efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C and D but the consumer currently only has messages from A, B and C, the proposed change will NOT wait until some messages from D arrives to start returning messages; it will just serve those from A, B and C. It will include those from D when they are available. That IS the current behavior. The proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it originally intended to take that into account in the Consumer code, the proposed change takes the current algorithm closer to that goal, IMHO. I could implement that logic at the caller side but, that would mean each library user need to know the inner working of the consumer code and to implement the logic on their own. Though as a first timer here, I do appreciate the complexity and functionalities in the client library and feel that we'd be better off as a community to implement the logic in the library so the complexity is hidden from library users.

Thanks, CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org>
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the 
> > round robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it 
> > just parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting 
> > > one when serving the next poll call. The net effect is that 
> > > completed fetch will be retrieved to serve as many poll calls 
> > > before retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, 
> > > B and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the 
> > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records 
> > > starting from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > then gets the corresponding responses and puts them in to a single 
> > > queue of CompletedFetches. IT then iterates over these completed 
> > > fetches queue and peels of number of records = max.poll.records 
> > > from each completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain 
> > > round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > hn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > >.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > >pt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > >P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > >ja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > >Nf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the 
> > > > previous partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls 
> > > > before moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease 
> > > > the number of records consumed in one partition within one poll 
> > > > call but will still use that same partition as the starting one 
> > > > in the next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to
> > > > 100 I believe. The main difference is that the client will need 
> > > > to make more poll calls when that value is set to 100, and 
> > > > because of the non-blocking nature I believe the cost of extra 
> > > > poll calls are not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has 
> > > > only one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to 
> > > > define the maximum number of records the consumer should pull 
> > > > from one partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email 
> > > > > is to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > su
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > cr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > _l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > m_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > 9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > rp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > ac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > wi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> + round
> robin fashion
>   15k (message/rfc822)

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi,

I know everyone is busy. But I would appreciate someone letting me know what to do next. I started this effort back in last year early November...

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Monday, January 07, 2019 9:24 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi guys,

I am not sure what to do next in this KIP process. Could anyone please help/advise me on what to do next? 

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as desirable if the message size is highly variable. Also this decrease the efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C and D but the consumer currently only has messages from A, B and C, the proposed change will NOT wait until some messages from D arrives to start returning messages; it will just serve those from A, B and C. It will include those from D when they are available. That IS the current behavior. The proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it originally intended to take that into account in the Consumer code, the proposed change takes the current algorithm closer to that goal, IMHO. I could implement that logic at the caller side but, that would mean each library user need to know the inner working of the consumer code and to implement the logic on their own. Though as a first timer here, I do appreciate the complexity and functionalities in the client library and feel that we'd be better off as a community to implement the logic in the library so the complexity is hidden from library users.

Thanks, CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org>
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the 
> > round robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it 
> > just parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting 
> > > one when serving the next poll call. The net effect is that 
> > > completed fetch will be retrieved to serve as many poll calls 
> > > before retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, 
> > > B and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the 
> > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records 
> > > starting from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > then gets the corresponding responses and puts them in to a single 
> > > queue of CompletedFetches. IT then iterates over these completed 
> > > fetches queue and peels of number of records = max.poll.records 
> > > from each completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain 
> > > round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > hn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > >.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > >pt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > >P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > >ja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > >Nf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the 
> > > > previous partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls 
> > > > before moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease 
> > > > the number of records consumed in one partition within one poll 
> > > > call but will still use that same partition as the starting one 
> > > > in the next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to
> > > > 100 I believe. The main difference is that the client will need 
> > > > to make more poll calls when that value is set to 100, and 
> > > > because of the non-blocking nature I believe the cost of extra 
> > > > poll calls are not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has 
> > > > only one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to 
> > > > define the maximum number of records the consumer should pull 
> > > > from one partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email 
> > > > > is to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > su
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > cr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > _l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > m_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > 9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > rp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > ac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > wi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> + round
> robin fashion
>   15k (message/rfc822)

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi guys,

I am not sure what to do next in this KIP process. Could anyone please help/advise me on what to do next? 

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Wednesday, January 02, 2019 12:55 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as desirable if the message size is highly variable. Also this decrease the efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C and D but the consumer currently only has messages from A, B and C, the proposed change will NOT wait until some messages from D arrives to start returning messages; it will just serve those from A, B and C. It will include those from D when they are available. That IS the current behavior. The proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it originally intended to take that into account in the Consumer code, the proposed change takes the current algorithm closer to that goal, IMHO. I could implement that logic at the caller side but, that would mean each library user need to know the inner working of the consumer code and to implement the logic on their own. Though as a first timer here, I do appreciate the complexity and functionalities in the client library and feel that we'd be better off as a community to implement the logic in the library so the complexity is hidden from library users.

Thanks, CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org>
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the 
> > round robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it 
> > just parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting 
> > > one when serving the next poll call. The net effect is that 
> > > completed fetch will be retrieved to serve as many poll calls 
> > > before retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, 
> > > B and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the 
> > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records 
> > > starting from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > then gets the corresponding responses and puts them in to a single 
> > > queue of CompletedFetches. IT then iterates over these completed 
> > > fetches queue and peels of number of records = max.poll.records 
> > > from each completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain 
> > > round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > hn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > >.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > >pt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > >P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > >ja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > >Nf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the 
> > > > previous partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls 
> > > > before moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease 
> > > > the number of records consumed in one partition within one poll 
> > > > call but will still use that same partition as the starting one 
> > > > in the next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to
> > > > 100 I believe. The main difference is that the client will need 
> > > > to make more poll calls when that value is set to 100, and 
> > > > because of the non-blocking nature I believe the cost of extra 
> > > > poll calls are not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has 
> > > > only one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to 
> > > > define the maximum number of records the consumer should pull 
> > > > from one partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email 
> > > > > is to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > su
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > cr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > _l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > m_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > 9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > rp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > ac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > wi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> + round
> robin fashion
>   15k (message/rfc822)

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi Colin,

Setting max.partition.fetch.bytes was discussed in the ticket. It's not as desirable if the message size is highly variable. Also this decrease the efficiency of network communication. 

In the case you mentioned below where a consumer can get messages from A, B, C and D but the consumer currently only has messages from A, B and C, the proposed change will NOT wait until some messages from D arrives to start returning messages; it will just serve those from A, B and C. It will include those from D when they are available. That IS the current behavior. The proposed change does not impose a strict round robin pattern.

The original KIP 41 discussed "Ensuring Fair Consumption", that means it originally intended to take that into account in the Consumer code, the proposed change takes the current algorithm closer to that goal, IMHO. I could implement that logic at the caller side but, that would mean each library user need to know the inner working of the consumer code and to implement the logic on their own. Though as a first timer here, I do appreciate the complexity and functionalities in the client library and feel that we'd be better off as a community to implement the logic in the library so the complexity is hidden from library users.

Thanks, CH

-----Original Message-----
From: Colin McCabe <cm...@apache.org> 
Sent: Saturday, December 22, 2018 3:53 AM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from 
> Mayuresh was the question about needing KIP for this change or not as 
> the KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in 
> this KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for 
> paused topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5
> uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the 
> > completed fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the 
> > round robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_ 
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds 
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3 
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_
> > yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it 
> > just parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting 
> > > one when serving the next poll call. The net effect is that 
> > > completed fetch will be retrieved to serve as many poll calls 
> > > before retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, 
> > > B and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the 
> > > order in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records 
> > > starting from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and 
> > > then gets the corresponding responses and puts them in to a single 
> > > queue of CompletedFetches. IT then iterates over these completed 
> > > fetches queue and peels of number of records = max.poll.records 
> > > from each completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain 
> > > round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRec
> > > or
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=
> > > Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg
> > > 6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQ
> > > hn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache
> > > >.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsum
> > > >pt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2
> > > >P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrReh
> > > >ja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNln
> > > >Nf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the 
> > > > previous partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls 
> > > > before moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease 
> > > > the number of records consumed in one partition within one poll 
> > > > call but will still use that same partition as the starting one 
> > > > in the next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to 
> > > > 100 I believe. The main difference is that the client will need 
> > > > to make more poll calls when that value is set to 100, and 
> > > > because of the non-blocking nature I believe the cost of extra 
> > > > poll calls are not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has 
> > > > only one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to 
> > > > define the maximum number of records the consumer should pull 
> > > > from one partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption 
> > > > > Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email 
> > > > > is to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BCon
> > > > > su
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3
> > > > > mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjf
> > > > > cr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ
> > > > > _l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.co
> > > > > m_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL
> > > > > 9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzV
> > > > > rp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.ap
> > > > > ac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=
> > > > > Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKY
> > > > > wi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> + round
> robin fashion
>   15k (message/rfc822)

Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by Colin McCabe <cm...@apache.org>.
Hi ChienHsing Wu,

Maybe I'm misunderstanding something, but I'm not sure I see the need for a KIP here.  You can just set max.partition.fetch.bytes to a very small value.  That will cause Kafka to fetch only one message from each partition.  This will give you the round robin behavior you want.

Alternately, if you don't want to change max.partition.fetch.bytes, you could do your own buffering to get round robin behavior.  Keep a buffer of messages from partition A, B, C, and D and hold back the messages from A, B, and C until one from D arrives, so that the A B C D A B C D... etc. order always repeats.

best,
Colin


On Wed, Dec 19, 2018, at 09:00, ChienHsing Wu wrote:
> Looking back the email thread I think one of the comments from Mayuresh 
> was the question about needing KIP for this change or not as the 
> KafkaConsumer does not guarantee the end user any order, and so no 
> changes to the contracts to users.
> 
> I entered KIP based on suggestions from the attached email when going 
> through code contribution process. I am not sure what to do next in this 
> KIP process. Could anyone please help/advise me on what to do next?
> 
> Thanks!
> 
> CH
> 
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com> 
> Sent: Wednesday, December 12, 2018 1:05 PM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Good to know that, Thanks! 
> 
> Nonetheless, that introduces additional complexity at the client side 
> for a common expectation to more or less receives records in a fair 
> fashion.
> 
> CH
> 
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Wednesday, December 12, 2018 12:55 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
> 
> Hi ChienHsing,
> 
> We are actually working on buffering the already fetched data for paused 
> topicPartitions, so ideally it should not have any effect on 
> performance.
> Associated jira : 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=
> 
> Thanks,
> 
> Mayuresh
> 
> On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:
> 
> > Hi Mayuresh,
> >
> > Thanks for the input!
> >
> > Pausing and Resuming are cumbersome and has some undesirable 
> > performance impact since pausing will in effect clean up the completed 
> > fetch and resuming will call the broker to retrieve again.
> >
> > The way I changed the code was just to parse the completed fetch 
> > earlier and ensure the order to retrieve are the same as the completed fetch queue.
> > I did make code changes to take into account the following in Fetcher class.
> >
> > 1) exception handling
> > 2) ensure the parsed partitions are not included in 
> > fetchablePartitions
> > 3) clear buffer when not in the newly assigned partitions in 
> > clearBufferedDataForUnassignedPartitions
> > 4) close them properly in close method
> >
> > Though the consumer does not guarantee explicit order, KIP 41 (link
> > below) did intend to ensure fair distribution and therefore the round 
> > robin algorithm in the code. The change I propose was to enhance it.
> >
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> > V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_yg
> > 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
> >
> > As for performance, the changes does not add any additional calls to 
> > the broker nor does it introduce significant processing logic; it just 
> > parses the completed fetch earlier and have a list to manage them.
> >
> >
> > CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 6:58 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > The other way I was thinking, this can be done outside of 
> > KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> > There is some gotcha there as in you might not know if the consumer 
> > has already fetched data for the remaining partitions.
> > Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> > does not guarantee the end user, any order, I believe. So if this 
> > change goes in, I don't think its changing the underlying behavior.
> > It would be good to check if this change will impact the performance 
> > of the consumer.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
> > wrote:
> >
> > > Hi Mayuresh,
> > >
> > > To serve one poll call the logic greedily gets records from one 
> > > completed fetch before including records from the next completed 
> > > fetch from the queue, as you described.
> > >
> > > The algorithm remembers the current completed fetch as starting one 
> > > when serving the next poll call. The net effect is that completed 
> > > fetch will be retrieved to serve as many poll calls before 
> > > retrieving records from any other completed fetches.
> > >
> > > For example, let's say the consumer has been assigned partition A, B 
> > > and C and the max.poll.records is set to 100. Right now we have 
> > > completed fetch A, and B. Each one has 300 records. It will take 6 
> > > poll calls to retrieve all record and the sequence of retrieved 
> > > partitions will be: A, A, A, B, B, B.
> > >
> > > Ideally, it should alternate between A and B. I was proposing to 
> > > move to the next one fetch for the next poll call based on the order 
> > > in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > > The implementation parses the completed fetch only once.
> > >
> > > Thanks, CH
> > >
> > > -----Original Message-----
> > > From: Mayuresh Gharat <gh...@gmail.com>
> > > Sent: Tuesday, December 11, 2018 1:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > > Hi ChienHsing,
> > >
> > > Thanks for the KIP.
> > > It would be great if you can explain with an example, what you mean by "
> > > Currently the implementation will return available records starting 
> > > from the last partition the last poll call retrieves records from.
> > > This leads to unfair patterns of record consumption from multiple
> > partitions."
> > >
> > > KafkaConsumer would send fetch requests to multiple brokers and then 
> > > gets the corresponding responses and puts them in to a single queue 
> > > of CompletedFetches. IT then iterates over these completed fetches 
> > > queue and peels of number of records = max.poll.records from each 
> > > completedFetch for each poll() before moving on to next 
> > > completedFetch. Also it does not send a fetch request for a 
> > > TopicPartition, if we already have a buffered data (completedFetch 
> > > or
> > > nextInlineRecord) for that TopicPartition. It also moves the 
> > > TopicPartition to the end of the assignment queue, once it has 
> > > received data from broker for that TopicPartition, to maintain round
> > robin fetch sequence for fairness.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > > <ch...@opentext.com>
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > >
> > > >
> > > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > > discussed in the Ensure Fair Consumption<
> > > >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > > g_
> > > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > > ds
> > > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > > IF
> > > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > > A3
> > > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn
> > > -o
> > > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > > >.
> > > > Your input on KIP-387<
> > > >
> > > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.o
> > > >rg
> > > >_
> > > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpt
> > > >io
> > > >n
> > > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1
> > > >-X
> > > >D
> > > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehja
> > > >ys
> > > >S
> > > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf
> > > >0k
> > > >T
> > > > tRlgk&e=>
> > > > would be very valuable.
> > > >
> > > >
> > > >
> > > > Thanks, ChienHsing
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: ChienHsing Wu <ch...@opentext.com>
> > > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > > To: dev@kafka.apache.org
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > Any comments/updates? I am not sure the next steps if no one has 
> > > > any further comments.
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: ChienHsing Wu
> > > > <ch...@opentext.com>>
> > > >
> > > > Sent: Tuesday, November 20, 2018 2:46 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi Matt,
> > > >
> > > >
> > > >
> > > > Thanks for the feedback.
> > > >
> > > >
> > > >
> > > > The issue with the current design is that it stays on the previous 
> > > > partition even if the last poll call consumes the 
> > > > max.poll.records; it will consume all records in that partition 
> > > > available at the consumer side to serve multiple poll calls before 
> > > > moving to the next
> > partition.
> > > >
> > > >
> > > >
> > > > Introducing another threshold at partition level will decrease the 
> > > > number of records consumed in one partition within one poll call 
> > > > but will still use that same partition as the starting one in the 
> > > > next poll
> > > call.
> > > >
> > > >
> > > >
> > > > The same effect can be achieved by setting max.poll.records to 100 
> > > > I believe. The main difference is that the client will need to 
> > > > make more poll calls when that value is set to 100, and because of 
> > > > the non-blocking nature I believe the cost of extra poll calls are 
> > > > not
> > > significant.
> > > >
> > > >
> > > >
> > > > Further thoughts?
> > > >
> > > >
> > > >
> > > > Thanks, CH
> > > >
> > > >
> > > >
> > > > -----Original Message-----
> > > >
> > > > From: Matt Farmer <ma...@frmr.me>>
> > > >
> > > > Sent: Monday, November 19, 2018 9:32 PM
> > > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > > Consumption Across Partitions in KafkaConsumer
> > > >
> > > >
> > > >
> > > > Hi there,
> > > >
> > > >
> > > >
> > > > Thanks for the KIP.
> > > >
> > > >
> > > >
> > > > We’ve run into issues with this at Mailchimp so something to 
> > > > address consuming behavior would save us from having to always 
> > > > ensure we’re running enough consumers that each consumer has only 
> > > > one partition (which is our usual MO).
> > > >
> > > >
> > > >
> > > > I wonder though if it would be simpler and more powerful to define 
> > > > the maximum number of records the consumer should pull from one 
> > > > partition before pulling some records from another?
> > > >
> > > >
> > > >
> > > > So if you set max.poll.records to 500 and then some new setting, 
> > > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > > switch what partition it reads from every 100 records - looping 
> > > > back around to the first partition that had records if there 
> > > > aren’t 5 or more partitions with records.
> > > >
> > > >
> > > >
> > > > What do you think?
> > > >
> > > >
> > > >
> > > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > > >
> > > >
> > > >
> > > > > Hi, could anyone please review this KIP?
> > > >
> > > > >
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Friday, November 09, 2018 1:10 PM
> > > >
> > > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > > >
> > > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> > > >
> > > > > Partitions in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Just to check: Will anyone review this? It's been silent for a
> > week...
> > > >
> > > > > Thanks, ChienHsing
> > > >
> > > > >
> > > >
> > > > > From: ChienHsing Wu
> > > >
> > > > > Sent: Monday, November 05, 2018 4:18 PM
> > > >
> > > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > > >
> > > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > > >
> > > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > > Partitions
> > > >
> > > > > in KafkaConsumer
> > > >
> > > > >
> > > >
> > > > > Hi I just put together the KIP page as requested. This email is 
> > > > > to
> > > >
> > > > > start the discussion thread.
> > > >
> > > > >
> > > >
> > > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > > >
> > > > > KafkaConsumer<
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > > or
> > > > > g_
> > > >
> > > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsu
> > > > > mp
> > > > > ti
> > > > > on
> > > >
> > > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf
> > > > > 2P
> > > > > 1-
> > > > > XD
> > > >
> > > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcr
> > > > > k8
> > > > > Xi
> > > > > oE
> > > >
> > > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_l
> > > > > NP
> > > > > 1b
> > > > > F5
> > > >
> > > > > 49_KU&e=
> > > >
> > > > > >
> > > >
> > > > > Pull Request:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_
> > > > > ap
> > > > > ac
> > > > > he
> > > >
> > > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9T
> > > > > oL
> > > > > W0
> > > > > OF
> > > >
> > > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp
> > > > > 5t
> > > > > BK
> > > > > 7L
> > > >
> > > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > > >
> > > > > Jira:
> > > >
> > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apac
> > > > > he
> > > > > .o
> > > > > rg
> > > >
> > > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az
> > > > > 03
> > > > > wM
> > > > > rb
> > > >
> > > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi
> > > > > 8c
> > > > > 8Y
> > > > > rz
> > > >
> > > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > > >
> > > > >
> > > >
> > > > > Thanks, CH
> > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
> 
> 
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
> Email had 1 attachment:
> + Re: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a round 
> robin fashion
>   15k (message/rfc822)

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Looking back the email thread I think one of the comments from Mayuresh was the question about needing KIP for this change or not as the KafkaConsumer does not guarantee the end user any order, and so no changes to the contracts to users.

I entered KIP based on suggestions from the attached email when going through code contribution process. I am not sure what to do next in this KIP process. Could anyone please help/advise me on what to do next?

Thanks!

CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Wednesday, December 12, 2018 1:05 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Good to know that, Thanks! 

Nonetheless, that introduces additional complexity at the client side for a common expectation to more or less receives records in a fair fashion.

CH

-----Original Message-----
From: Mayuresh Gharat <gh...@gmail.com>
Sent: Wednesday, December 12, 2018 12:55 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing,

We are actually working on buffering the already fetched data for paused topicPartitions, so ideally it should not have any effect on performance.
Associated jira : https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=

Thanks,

Mayuresh

On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Hi Mayuresh,
>
> Thanks for the input!
>
> Pausing and Resuming are cumbersome and has some undesirable 
> performance impact since pausing will in effect clean up the completed 
> fetch and resuming will call the broker to retrieve again.
>
> The way I changed the code was just to parse the completed fetch 
> earlier and ensure the order to retrieve are the same as the completed fetch queue.
> I did make code changes to take into account the following in Fetcher class.
>
> 1) exception handling
> 2) ensure the parsed partitions are not included in 
> fetchablePartitions
> 3) clear buffer when not in the newly assigned partitions in 
> clearBufferedDataForUnassignedPartitions
> 4) close them properly in close method
>
> Though the consumer does not guarantee explicit order, KIP 41 (link
> below) did intend to ensure fair distribution and therefore the round 
> robin algorithm in the code. The change I propose was to enhance it.
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_yg
> 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
>
> As for performance, the changes does not add any additional calls to 
> the broker nor does it introduce significant processing logic; it just 
> parses the completed fetch earlier and have a list to manage them.
>
>
> CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Tuesday, December 11, 2018 6:58 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> The other way I was thinking, this can be done outside of 
> KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> There is some gotcha there as in you might not know if the consumer 
> has already fetched data for the remaining partitions.
> Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> does not guarantee the end user, any order, I believe. So if this 
> change goes in, I don't think its changing the underlying behavior.
> It would be good to check if this change will impact the performance 
> of the consumer.
>
> Thanks,
>
> Mayuresh
>
>
> On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
> wrote:
>
> > Hi Mayuresh,
> >
> > To serve one poll call the logic greedily gets records from one 
> > completed fetch before including records from the next completed 
> > fetch from the queue, as you described.
> >
> > The algorithm remembers the current completed fetch as starting one 
> > when serving the next poll call. The net effect is that completed 
> > fetch will be retrieved to serve as many poll calls before 
> > retrieving records from any other completed fetches.
> >
> > For example, let's say the consumer has been assigned partition A, B 
> > and C and the max.poll.records is set to 100. Right now we have 
> > completed fetch A, and B. Each one has 300 records. It will take 6 
> > poll calls to retrieve all record and the sequence of retrieved 
> > partitions will be: A, A, A, B, B, B.
> >
> > Ideally, it should alternate between A and B. I was proposing to 
> > move to the next one fetch for the next poll call based on the order 
> > in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > The implementation parses the completed fetch only once.
> >
> > Thanks, CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 1:21 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > Thanks for the KIP.
> > It would be great if you can explain with an example, what you mean by "
> > Currently the implementation will return available records starting 
> > from the last partition the last poll call retrieves records from.
> > This leads to unfair patterns of record consumption from multiple
> partitions."
> >
> > KafkaConsumer would send fetch requests to multiple brokers and then 
> > gets the corresponding responses and puts them in to a single queue 
> > of CompletedFetches. IT then iterates over these completed fetches 
> > queue and peels of number of records = max.poll.records from each 
> > completedFetch for each poll() before moving on to next 
> > completedFetch. Also it does not send a fetch request for a 
> > TopicPartition, if we already have a buffered data (completedFetch 
> > or
> > nextInlineRecord) for that TopicPartition. It also moves the 
> > TopicPartition to the end of the assignment queue, once it has 
> > received data from broker for that TopicPartition, to maintain round
> robin fetch sequence for fairness.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Jason,
> > >
> > >
> > >
> > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > discussed in the Ensure Fair Consumption<
> > >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn
> > -o
> > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > >.
> > > Your input on KIP-387<
> > >
> > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.o
> > >rg
> > >_
> > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpt
> > >io
> > >n
> > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1
> > >-X
> > >D
> > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehja
> > >ys
> > >S
> > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf
> > >0k
> > >T
> > > tRlgk&e=>
> > > would be very valuable.
> > >
> > >
> > >
> > > Thanks, ChienHsing
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: ChienHsing Wu <ch...@opentext.com>
> > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi,
> > >
> > >
> > >
> > > Any comments/updates? I am not sure the next steps if no one has 
> > > any further comments.
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: ChienHsing Wu
> > > <ch...@opentext.com>>
> > >
> > > Sent: Tuesday, November 20, 2018 2:46 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi Matt,
> > >
> > >
> > >
> > > Thanks for the feedback.
> > >
> > >
> > >
> > > The issue with the current design is that it stays on the previous 
> > > partition even if the last poll call consumes the 
> > > max.poll.records; it will consume all records in that partition 
> > > available at the consumer side to serve multiple poll calls before 
> > > moving to the next
> partition.
> > >
> > >
> > >
> > > Introducing another threshold at partition level will decrease the 
> > > number of records consumed in one partition within one poll call 
> > > but will still use that same partition as the starting one in the 
> > > next poll
> > call.
> > >
> > >
> > >
> > > The same effect can be achieved by setting max.poll.records to 100 
> > > I believe. The main difference is that the client will need to 
> > > make more poll calls when that value is set to 100, and because of 
> > > the non-blocking nature I believe the cost of extra poll calls are 
> > > not
> > significant.
> > >
> > >
> > >
> > > Further thoughts?
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: Matt Farmer <ma...@frmr.me>>
> > >
> > > Sent: Monday, November 19, 2018 9:32 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi there,
> > >
> > >
> > >
> > > Thanks for the KIP.
> > >
> > >
> > >
> > > We’ve run into issues with this at Mailchimp so something to 
> > > address consuming behavior would save us from having to always 
> > > ensure we’re running enough consumers that each consumer has only 
> > > one partition (which is our usual MO).
> > >
> > >
> > >
> > > I wonder though if it would be simpler and more powerful to define 
> > > the maximum number of records the consumer should pull from one 
> > > partition before pulling some records from another?
> > >
> > >
> > >
> > > So if you set max.poll.records to 500 and then some new setting, 
> > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > switch what partition it reads from every 100 records - looping 
> > > back around to the first partition that had records if there 
> > > aren’t 5 or more partitions with records.
> > >
> > >
> > >
> > > What do you think?
> > >
> > >
> > >
> > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > >
> > >
> > >
> > > > Hi, could anyone please review this KIP?
> > >
> > > >
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Friday, November 09, 2018 1:10 PM
> > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> > >
> > > > Partitions in KafkaConsumer
> > >
> > > >
> > >
> > > > Just to check: Will anyone review this? It's been silent for a
> week...
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Monday, November 05, 2018 4:18 PM
> > >
> > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > >
> > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > >
> > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > Partitions
> > >
> > > > in KafkaConsumer
> > >
> > > >
> > >
> > > > Hi I just put together the KIP page as requested. This email is 
> > > > to
> > >
> > > > start the discussion thread.
> > >
> > > >
> > >
> > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > >
> > > > KafkaConsumer<
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > >
> > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsu
> > > > mp
> > > > ti
> > > > on
> > >
> > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf
> > > > 2P
> > > > 1-
> > > > XD
> > >
> > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcr
> > > > k8
> > > > Xi
> > > > oE
> > >
> > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_l
> > > > NP
> > > > 1b
> > > > F5
> > >
> > > > 49_KU&e=
> > >
> > > > >
> > >
> > > > Pull Request:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_
> > > > ap
> > > > ac
> > > > he
> > >
> > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9T
> > > > oL
> > > > W0
> > > > OF
> > >
> > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp
> > > > 5t
> > > > BK
> > > > 7L
> > >
> > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > >
> > > > Jira:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apac
> > > > he
> > > > .o
> > > > rg
> > >
> > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az
> > > > 03
> > > > wM
> > > > rb
> > >
> > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi
> > > > 8c
> > > > 8Y
> > > > rz
> > >
> > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > >
> > > >
> > >
> > > > Thanks, CH
> > >
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Good to know that, Thanks! 

Nonetheless, that introduces additional complexity at the client side for a common expectation to more or less receives records in a fair fashion.

CH

-----Original Message-----
From: Mayuresh Gharat <gh...@gmail.com> 
Sent: Wednesday, December 12, 2018 12:55 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing,

We are actually working on buffering the already fetched data for paused topicPartitions, so ideally it should not have any effect on performance.
Associated jira : https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=

Thanks,

Mayuresh

On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Hi Mayuresh,
>
> Thanks for the input!
>
> Pausing and Resuming are cumbersome and has some undesirable 
> performance impact since pausing will in effect clean up the completed 
> fetch and resuming will call the broker to retrieve again.
>
> The way I changed the code was just to parse the completed fetch 
> earlier and ensure the order to retrieve are the same as the completed fetch queue.
> I did make code changes to take into account the following in Fetcher class.
>
> 1) exception handling
> 2) ensure the parsed partitions are not included in 
> fetchablePartitions
> 3) clear buffer when not in the newly assigned partitions in 
> clearBufferedDataForUnassignedPartitions
> 4) close them properly in close method
>
> Though the consumer does not guarantee explicit order, KIP 41 (link 
> below) did intend to ensure fair distribution and therefore the round 
> robin algorithm in the code. The change I propose was to enhance it.
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_yg
> 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
>
> As for performance, the changes does not add any additional calls to 
> the broker nor does it introduce significant processing logic; it just 
> parses the completed fetch earlier and have a list to manage them.
>
>
> CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Tuesday, December 11, 2018 6:58 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> The other way I was thinking, this can be done outside of 
> KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
> There is some gotcha there as in you might not know if the consumer 
> has already fetched data for the remaining partitions.
> Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> does not guarantee the end user, any order, I believe. So if this 
> change goes in, I don't think its changing the underlying behavior.
> It would be good to check if this change will impact the performance 
> of the consumer.
>
> Thanks,
>
> Mayuresh
>
>
> On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
> wrote:
>
> > Hi Mayuresh,
> >
> > To serve one poll call the logic greedily gets records from one 
> > completed fetch before including records from the next completed 
> > fetch from the queue, as you described.
> >
> > The algorithm remembers the current completed fetch as starting one 
> > when serving the next poll call. The net effect is that completed 
> > fetch will be retrieved to serve as many poll calls before 
> > retrieving records from any other completed fetches.
> >
> > For example, let's say the consumer has been assigned partition A, B 
> > and C and the max.poll.records is set to 100. Right now we have 
> > completed fetch A, and B. Each one has 300 records. It will take 6 
> > poll calls to retrieve all record and the sequence of retrieved 
> > partitions will be: A, A, A, B, B, B.
> >
> > Ideally, it should alternate between A and B. I was proposing to 
> > move to the next one fetch for the next poll call based on the order 
> > in the completed fetch queue, so the order becomes A, B, A, B, A, B. 
> > The implementation parses the completed fetch only once.
> >
> > Thanks, CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 1:21 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > Thanks for the KIP.
> > It would be great if you can explain with an example, what you mean by "
> > Currently the implementation will return available records starting 
> > from the last partition the last poll call retrieves records from.
> > This leads to unfair patterns of record consumption from multiple
> partitions."
> >
> > KafkaConsumer would send fetch requests to multiple brokers and then 
> > gets the corresponding responses and puts them in to a single queue 
> > of CompletedFetches. IT then iterates over these completed fetches 
> > queue and peels of number of records = max.poll.records from each 
> > completedFetch for each poll() before moving on to next 
> > completedFetch. Also it does not send a fetch request for a 
> > TopicPartition, if we already have a buffered data (completedFetch 
> > or
> > nextInlineRecord) for that TopicPartition. It also moves the 
> > TopicPartition to the end of the assignment queue, once it has 
> > received data from broker for that TopicPartition, to maintain round
> robin fetch sequence for fairness.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > <ch...@opentext.com>
> > wrote:
> >
> > > Jason,
> > >
> > >
> > >
> > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > discussed in the Ensure Fair Consumption<
> > >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_ 
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds 
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3 
> > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn
> > -o
> > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > >.
> > > Your input on KIP-387<
> > >
> > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.o
> > >rg
> > >_
> > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpt
> > >io
> > >n
> > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1
> > >-X
> > >D
> > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehja
> > >ys
> > >S
> > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf
> > >0k
> > >T
> > > tRlgk&e=>
> > > would be very valuable.
> > >
> > >
> > >
> > > Thanks, ChienHsing
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: ChienHsing Wu <ch...@opentext.com>
> > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi,
> > >
> > >
> > >
> > > Any comments/updates? I am not sure the next steps if no one has 
> > > any further comments.
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: ChienHsing Wu
> > > <ch...@opentext.com>>
> > >
> > > Sent: Tuesday, November 20, 2018 2:46 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi Matt,
> > >
> > >
> > >
> > > Thanks for the feedback.
> > >
> > >
> > >
> > > The issue with the current design is that it stays on the previous 
> > > partition even if the last poll call consumes the 
> > > max.poll.records; it will consume all records in that partition 
> > > available at the consumer side to serve multiple poll calls before 
> > > moving to the next
> partition.
> > >
> > >
> > >
> > > Introducing another threshold at partition level will decrease the 
> > > number of records consumed in one partition within one poll call 
> > > but will still use that same partition as the starting one in the 
> > > next poll
> > call.
> > >
> > >
> > >
> > > The same effect can be achieved by setting max.poll.records to 100 
> > > I believe. The main difference is that the client will need to 
> > > make more poll calls when that value is set to 100, and because of 
> > > the non-blocking nature I believe the cost of extra poll calls are 
> > > not
> > significant.
> > >
> > >
> > >
> > > Further thoughts?
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: Matt Farmer <ma...@frmr.me>>
> > >
> > > Sent: Monday, November 19, 2018 9:32 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi there,
> > >
> > >
> > >
> > > Thanks for the KIP.
> > >
> > >
> > >
> > > We’ve run into issues with this at Mailchimp so something to 
> > > address consuming behavior would save us from having to always 
> > > ensure we’re running enough consumers that each consumer has only 
> > > one partition (which is our usual MO).
> > >
> > >
> > >
> > > I wonder though if it would be simpler and more powerful to define 
> > > the maximum number of records the consumer should pull from one 
> > > partition before pulling some records from another?
> > >
> > >
> > >
> > > So if you set max.poll.records to 500 and then some new setting, 
> > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > switch what partition it reads from every 100 records - looping 
> > > back around to the first partition that had records if there 
> > > aren’t 5 or more partitions with records.
> > >
> > >
> > >
> > > What do you think?
> > >
> > >
> > >
> > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > <chienhsw@opentext.com <ma...@opentext.com>> wrote:
> > >
> > >
> > >
> > > > Hi, could anyone please review this KIP?
> > >
> > > >
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Friday, November 09, 2018 1:10 PM
> > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> > >
> > > > Partitions in KafkaConsumer
> > >
> > > >
> > >
> > > > Just to check: Will anyone review this? It's been silent for a
> week...
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Monday, November 05, 2018 4:18 PM
> > >
> > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > >
> > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > >
> > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > Partitions
> > >
> > > > in KafkaConsumer
> > >
> > > >
> > >
> > > > Hi I just put together the KIP page as requested. This email is 
> > > > to
> > >
> > > > start the discussion thread.
> > >
> > > >
> > >
> > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > >
> > > > KafkaConsumer<
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > >
> > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsu
> > > > mp
> > > > ti
> > > > on
> > >
> > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf
> > > > 2P
> > > > 1-
> > > > XD
> > >
> > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcr
> > > > k8
> > > > Xi
> > > > oE
> > >
> > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_l
> > > > NP
> > > > 1b
> > > > F5
> > >
> > > > 49_KU&e=
> > >
> > > > >
> > >
> > > > Pull Request:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_
> > > > ap
> > > > ac
> > > > he
> > >
> > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9T
> > > > oL
> > > > W0
> > > > OF
> > >
> > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp
> > > > 5t
> > > > BK
> > > > 7L
> > >
> > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > >
> > > > Jira:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apac
> > > > he
> > > > .o
> > > > rg
> > >
> > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az
> > > > 03
> > > > wM
> > > > rb
> > >
> > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi
> > > > 8c
> > > > 8Y
> > > > rz
> > >
> > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > >
> > > >
> > >
> > > > Thanks, CH
> > >
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by Mayuresh Gharat <gh...@gmail.com>.
Hi ChienHsing,

We are actually working on buffering the already fetched data for paused
topicPartitions, so ideally it should not have any effect on performance.
Associated jira : https://issues.apache.org/jira/browse/KAFKA-7548

Thanks,

Mayuresh

On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Hi Mayuresh,
>
> Thanks for the input!
>
> Pausing and Resuming are cumbersome and has some undesirable performance
> impact since pausing will in effect clean up the completed fetch and
> resuming will call the broker to retrieve again.
>
> The way I changed the code was just to parse the completed fetch earlier
> and ensure the order to retrieve are the same as the completed fetch queue.
> I did make code changes to take into account the following in Fetcher class.
>
> 1) exception handling
> 2) ensure the parsed partitions are not included in fetchablePartitions
> 3) clear buffer when not in the newly assigned partitions in
> clearBufferedDataForUnassignedPartitions
> 4) close them properly in close method
>
> Though the consumer does not guarantee explicit order, KIP 41 (link below)
> did intend to ensure fair distribution and therefore the round robin
> algorithm in the code. The change I propose was to enhance it.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-EnsuringFairConsumption
>
> As for performance, the changes does not add any additional calls to the
> broker nor does it introduce significant processing logic; it just parses
> the completed fetch earlier and have a list to manage them.
>
>
> CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Tuesday, December 11, 2018 6:58 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> The other way I was thinking, this can be done outside of KafkaConsumer is
> by pausing and resuming TopicPartitions (may be in round robin fashion).
> There is some gotcha there as in you might not know if the consumer has
> already fetched data for the remaining partitions.
> Also I am not sure, if we need a KIP for this as the KafkaConsumer does
> not guarantee the end user, any order, I believe. So if this change goes
> in, I don't think its changing the underlying behavior.
> It would be good to check if this change will impact the performance of
> the consumer.
>
> Thanks,
>
> Mayuresh
>
>
> On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
> wrote:
>
> > Hi Mayuresh,
> >
> > To serve one poll call the logic greedily gets records from one
> > completed fetch before including records from the next completed fetch
> > from the queue, as you described.
> >
> > The algorithm remembers the current completed fetch as starting one
> > when serving the next poll call. The net effect is that completed
> > fetch will be retrieved to serve as many poll calls before retrieving
> > records from any other completed fetches.
> >
> > For example, let's say the consumer has been assigned partition A, B
> > and C and the max.poll.records is set to 100. Right now we have
> > completed fetch A, and B. Each one has 300 records. It will take 6
> > poll calls to retrieve all record and the sequence of retrieved
> > partitions will be: A, A, A, B, B, B.
> >
> > Ideally, it should alternate between A and B. I was proposing to move
> > to the next one fetch for the next poll call based on the order in the
> > completed fetch queue, so the order becomes A, B, A, B, A, B. The
> > implementation parses the completed fetch only once.
> >
> > Thanks, CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gh...@gmail.com>
> > Sent: Tuesday, December 11, 2018 1:21 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > Thanks for the KIP.
> > It would be great if you can explain with an example, what you mean by "
> > Currently the implementation will return available records starting
> > from the last partition the last poll call retrieves records from.
> > This leads to unfair patterns of record consumption from multiple
> partitions."
> >
> > KafkaConsumer would send fetch requests to multiple brokers and then
> > gets the corresponding responses and puts them in to a single queue of
> > CompletedFetches. IT then iterates over these completed fetches queue
> > and peels of number of records = max.poll.records from each
> > completedFetch for each poll() before moving on to next
> > completedFetch. Also it does not send a fetch request for a
> > TopicPartition, if we already have a buffered data (completedFetch or
> > nextInlineRecord) for that TopicPartition. It also moves the
> > TopicPartition to the end of the assignment queue, once it has
> > received data from broker for that TopicPartition, to maintain round
> robin fetch sequence for fairness.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <ch...@opentext.com>
> > wrote:
> >
> > > Jason,
> > >
> > >
> > >
> > > KIP 41 was initiated by you and this KIP is to change the logic
> > > discussed in the Ensure Fair Consumption<
> > >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn-o
> > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > >.
> > > Your input on KIP-387<
> > >
> > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org
> > >_
> > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumptio
> > >n
> > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-X
> > >D
> > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjays
> > >S
> > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf0k
> > >T
> > > tRlgk&e=>
> > > would be very valuable.
> > >
> > >
> > >
> > > Thanks, ChienHsing
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: ChienHsing Wu <ch...@opentext.com>
> > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi,
> > >
> > >
> > >
> > > Any comments/updates? I am not sure the next steps if no one has any
> > > further comments.
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: ChienHsing Wu
> > > <ch...@opentext.com>>
> > >
> > > Sent: Tuesday, November 20, 2018 2:46 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi Matt,
> > >
> > >
> > >
> > > Thanks for the feedback.
> > >
> > >
> > >
> > > The issue with the current design is that it stays on the previous
> > > partition even if the last poll call consumes the max.poll.records;
> > > it will consume all records in that partition available at the
> > > consumer side to serve multiple poll calls before moving to the next
> partition.
> > >
> > >
> > >
> > > Introducing another threshold at partition level will decrease the
> > > number of records consumed in one partition within one poll call but
> > > will still use that same partition as the starting one in the next
> > > poll
> > call.
> > >
> > >
> > >
> > > The same effect can be achieved by setting max.poll.records to 100 I
> > > believe. The main difference is that the client will need to make
> > > more poll calls when that value is set to 100, and because of the
> > > non-blocking nature I believe the cost of extra poll calls are not
> > significant.
> > >
> > >
> > >
> > > Further thoughts?
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: Matt Farmer <ma...@frmr.me>>
> > >
> > > Sent: Monday, November 19, 2018 9:32 PM
> > >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi there,
> > >
> > >
> > >
> > > Thanks for the KIP.
> > >
> > >
> > >
> > > We’ve run into issues with this at Mailchimp so something to address
> > > consuming behavior would save us from having to always ensure we’re
> > > running enough consumers that each consumer has only one partition
> > > (which is our usual MO).
> > >
> > >
> > >
> > > I wonder though if it would be simpler and more powerful to define
> > > the maximum number of records the consumer should pull from one
> > > partition before pulling some records from another?
> > >
> > >
> > >
> > > So if you set max.poll.records to 500 and then some new setting,
> > > max.poll.records.per.partition, to 100 then the Consumer would
> > > switch what partition it reads from every 100 records - looping back
> > > around to the first partition that had records if there aren’t 5 or
> > > more partitions with records.
> > >
> > >
> > >
> > > What do you think?
> > >
> > >
> > >
> > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chienhsw@opentext.com
> > > <ma...@opentext.com>> wrote:
> > >
> > >
> > >
> > > > Hi, could anyone please review this KIP?
> > >
> > > >
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Friday, November 09, 2018 1:10 PM
> > >
> > > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> > >
> > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> > >
> > > > Partitions in KafkaConsumer
> > >
> > > >
> > >
> > > > Just to check: Will anyone review this? It's been silent for a
> week...
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Monday, November 05, 2018 4:18 PM
> > >
> > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > >
> > > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> > >
> > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across
> > > > Partitions
> > >
> > > > in KafkaConsumer
> > >
> > > >
> > >
> > > > Hi I just put together the KIP page as requested. This email is to
> > >
> > > > start the discussion thread.
> > >
> > > >
> > >
> > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > >
> > > > KafkaConsumer<
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > >
> > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsump
> > > > ti
> > > > on
> > >
> > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P
> > > > 1-
> > > > XD
> > >
> > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8
> > > > Xi
> > > > oE
> > >
> > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP
> > > > 1b
> > > > F5
> > >
> > > > 49_KU&e=
> > >
> > > > >
> > >
> > > > Pull Request:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap
> > > > ac
> > > > he
> > >
> > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToL
> > > > W0
> > > > OF
> > >
> > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5t
> > > > BK
> > > > 7L
> > >
> > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > >
> > > > Jira:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache
> > > > .o
> > > > rg
> > >
> > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03
> > > > wM
> > > > rb
> > >
> > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c
> > > > 8Y
> > > > rz
> > >
> > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > >
> > > >
> > >
> > > > Thanks, CH
> > >
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi Mayuresh,

Thanks for the input!

Pausing and Resuming are cumbersome and has some undesirable performance impact since pausing will in effect clean up the completed fetch and resuming will call the broker to retrieve again. 

The way I changed the code was just to parse the completed fetch earlier and ensure the order to retrieve are the same as the completed fetch queue. I did make code changes to take into account the following in Fetcher class.

1) exception handling
2) ensure the parsed partitions are not included in fetchablePartitions
3) clear buffer when not in the newly assigned partitions in clearBufferedDataForUnassignedPartitions
4) close them properly in close method

Though the consumer does not guarantee explicit order, KIP 41 (link below) did intend to ensure fair distribution and therefore the round robin algorithm in the code. The change I propose was to enhance it.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-EnsuringFairConsumption

As for performance, the changes does not add any additional calls to the broker nor does it introduce significant processing logic; it just parses the completed fetch earlier and have a list to manage them.


CH

-----Original Message-----
From: Mayuresh Gharat <gh...@gmail.com> 
Sent: Tuesday, December 11, 2018 6:58 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing,

The other way I was thinking, this can be done outside of KafkaConsumer is by pausing and resuming TopicPartitions (may be in round robin fashion).
There is some gotcha there as in you might not know if the consumer has already fetched data for the remaining partitions.
Also I am not sure, if we need a KIP for this as the KafkaConsumer does not guarantee the end user, any order, I believe. So if this change goes in, I don't think its changing the underlying behavior.
It would be good to check if this change will impact the performance of the consumer.

Thanks,

Mayuresh


On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
wrote:

> Hi Mayuresh,
>
> To serve one poll call the logic greedily gets records from one 
> completed fetch before including records from the next completed fetch 
> from the queue, as you described.
>
> The algorithm remembers the current completed fetch as starting one 
> when serving the next poll call. The net effect is that completed 
> fetch will be retrieved to serve as many poll calls before retrieving 
> records from any other completed fetches.
>
> For example, let's say the consumer has been assigned partition A, B 
> and C and the max.poll.records is set to 100. Right now we have 
> completed fetch A, and B. Each one has 300 records. It will take 6 
> poll calls to retrieve all record and the sequence of retrieved 
> partitions will be: A, A, A, B, B, B.
>
> Ideally, it should alternate between A and B. I was proposing to move 
> to the next one fetch for the next poll call based on the order in the 
> completed fetch queue, so the order becomes A, B, A, B, A, B. The 
> implementation parses the completed fetch only once.
>
> Thanks, CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Tuesday, December 11, 2018 1:21 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> Thanks for the KIP.
> It would be great if you can explain with an example, what you mean by "
> Currently the implementation will return available records starting 
> from the last partition the last poll call retrieves records from. 
> This leads to unfair patterns of record consumption from multiple partitions."
>
> KafkaConsumer would send fetch requests to multiple brokers and then 
> gets the corresponding responses and puts them in to a single queue of 
> CompletedFetches. IT then iterates over these completed fetches queue 
> and peels of number of records = max.poll.records from each 
> completedFetch for each poll() before moving on to next 
> completedFetch. Also it does not send a fetch request for a 
> TopicPartition, if we already have a buffered data (completedFetch or 
> nextInlineRecord) for that TopicPartition. It also moves the 
> TopicPartition to the end of the assignment queue, once it has 
> received data from broker for that TopicPartition, to maintain round robin fetch sequence for fairness.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <ch...@opentext.com>
> wrote:
>
> > Jason,
> >
> >
> >
> > KIP 41 was initiated by you and this KIP is to change the logic 
> > discussed in the Ensure Fair Consumption<
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn-o
> SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> >.
> > Your input on KIP-387<
> > 
> >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org
> >_  
> >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumptio
> >n  
> >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-X
> >D  
> >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjays
> >S  
> >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf0k
> >T
> > tRlgk&e=>
> > would be very valuable.
> >
> >
> >
> > Thanks, ChienHsing
> >
> >
> >
> > -----Original Message-----
> > From: ChienHsing Wu <ch...@opentext.com>
> > Sent: Tuesday, December 04, 2018 11:43 AM
> > To: dev@kafka.apache.org
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi,
> >
> >
> >
> > Any comments/updates? I am not sure the next steps if no one has any 
> > further comments.
> >
> >
> >
> > Thanks, CH
> >
> >
> >
> > -----Original Message-----
> >
> > From: ChienHsing Wu
> > <ch...@opentext.com>>
> >
> > Sent: Tuesday, November 20, 2018 2:46 PM
> >
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi Matt,
> >
> >
> >
> > Thanks for the feedback.
> >
> >
> >
> > The issue with the current design is that it stays on the previous 
> > partition even if the last poll call consumes the max.poll.records; 
> > it will consume all records in that partition available at the 
> > consumer side to serve multiple poll calls before moving to the next partition.
> >
> >
> >
> > Introducing another threshold at partition level will decrease the 
> > number of records consumed in one partition within one poll call but 
> > will still use that same partition as the starting one in the next 
> > poll
> call.
> >
> >
> >
> > The same effect can be achieved by setting max.poll.records to 100 I 
> > believe. The main difference is that the client will need to make 
> > more poll calls when that value is set to 100, and because of the 
> > non-blocking nature I believe the cost of extra poll calls are not
> significant.
> >
> >
> >
> > Further thoughts?
> >
> >
> >
> > Thanks, CH
> >
> >
> >
> > -----Original Message-----
> >
> > From: Matt Farmer <ma...@frmr.me>>
> >
> > Sent: Monday, November 19, 2018 9:32 PM
> >
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi there,
> >
> >
> >
> > Thanks for the KIP.
> >
> >
> >
> > We’ve run into issues with this at Mailchimp so something to address 
> > consuming behavior would save us from having to always ensure we’re 
> > running enough consumers that each consumer has only one partition 
> > (which is our usual MO).
> >
> >
> >
> > I wonder though if it would be simpler and more powerful to define 
> > the maximum number of records the consumer should pull from one 
> > partition before pulling some records from another?
> >
> >
> >
> > So if you set max.poll.records to 500 and then some new setting, 
> > max.poll.records.per.partition, to 100 then the Consumer would 
> > switch what partition it reads from every 100 records - looping back 
> > around to the first partition that had records if there aren’t 5 or 
> > more partitions with records.
> >
> >
> >
> > What do you think?
> >
> >
> >
> > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chienhsw@opentext.com 
> > <ma...@opentext.com>> wrote:
> >
> >
> >
> > > Hi, could anyone please review this KIP?
> >
> > >
> >
> > > Thanks, ChienHsing
> >
> > >
> >
> > > From: ChienHsing Wu
> >
> > > Sent: Friday, November 09, 2018 1:10 PM
> >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> >
> > > Partitions in KafkaConsumer
> >
> > >
> >
> > > Just to check: Will anyone review this? It's been silent for a week...
> >
> > > Thanks, ChienHsing
> >
> > >
> >
> > > From: ChienHsing Wu
> >
> > > Sent: Monday, November 05, 2018 4:18 PM
> >
> > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> >
> > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> >
> > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > Partitions
> >
> > > in KafkaConsumer
> >
> > >
> >
> > > Hi I just put together the KIP page as requested. This email is to
> >
> > > start the discussion thread.
> >
> > >
> >
> > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> >
> > > KafkaConsumer<
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > or
> > > g_
> >
> > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsump
> > > ti
> > > on
> >
> > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P
> > > 1-
> > > XD
> >
> > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8
> > > Xi
> > > oE
> >
> > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP
> > > 1b
> > > F5
> >
> > > 49_KU&e=
> >
> > > >
> >
> > > Pull Request:
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap
> > > ac
> > > he
> >
> > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToL
> > > W0
> > > OF
> >
> > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5t
> > > BK
> > > 7L
> >
> > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> >
> > > Jira:
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache
> > > .o
> > > rg
> >
> > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03
> > > wM
> > > rb
> >
> > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c
> > > 8Y
> > > rz
> >
> > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> >
> > >
> >
> > > Thanks, CH
> >
> > >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by Mayuresh Gharat <gh...@gmail.com>.
Hi ChienHsing,

The other way I was thinking, this can be done outside of KafkaConsumer is
by pausing and resuming TopicPartitions (may be in round robin fashion).
There is some gotcha there as in you might not know if the consumer has
already fetched data for the remaining partitions.
Also I am not sure, if we need a KIP for this as the KafkaConsumer does not
guarantee the end user, any order, I believe. So if this change goes in, I
don't think its changing the underlying behavior.
It would be good to check if this change will impact the performance of the
consumer.

Thanks,

Mayuresh


On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <ch...@opentext.com>
wrote:

> Hi Mayuresh,
>
> To serve one poll call the logic greedily gets records from one completed
> fetch before including records from the next completed fetch from the
> queue, as you described.
>
> The algorithm remembers the current completed fetch as starting one when
> serving the next poll call. The net effect is that completed fetch will be
> retrieved to serve as many poll calls before retrieving records from any
> other completed fetches.
>
> For example, let's say the consumer has been assigned partition A, B and C
> and the max.poll.records is set to 100. Right now we have completed fetch
> A, and B. Each one has 300 records. It will take 6 poll calls to retrieve
> all record and the sequence of retrieved partitions will be: A, A, A, B, B,
> B.
>
> Ideally, it should alternate between A and B. I was proposing to move to
> the next one fetch for the next poll call based on the order in the
> completed fetch queue, so the order becomes A, B, A, B, A, B. The
> implementation parses the completed fetch only once.
>
> Thanks, CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gh...@gmail.com>
> Sent: Tuesday, December 11, 2018 1:21 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> Thanks for the KIP.
> It would be great if you can explain with an example, what you mean by "
> Currently the implementation will return available records starting from
> the last partition the last poll call retrieves records from. This leads to
> unfair patterns of record consumption from multiple partitions."
>
> KafkaConsumer would send fetch requests to multiple brokers and then gets
> the corresponding responses and puts them in to a single queue of
> CompletedFetches. IT then iterates over these completed fetches queue and
> peels of number of records = max.poll.records from each completedFetch for
> each poll() before moving on to next completedFetch. Also it does not send
> a fetch request for a TopicPartition, if we already have a buffered data
> (completedFetch or nextInlineRecord) for that TopicPartition. It also moves
> the TopicPartition to the end of the assignment queue, once it has received
> data from broker for that TopicPartition, to maintain round robin fetch
> sequence for fairness.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <ch...@opentext.com>
> wrote:
>
> > Jason,
> >
> >
> >
> > KIP 41 was initiated by you and this KIP is to change the logic
> > discussed in the Ensure Fair Consumption<
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords-23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn-oSZuBYfZ_V2OY5ikbksVMzbt9o&e=
> >.
> > Your input on KIP-387<
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
> > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD
> > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjaysS
> > ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf0kT
> > tRlgk&e=>
> > would be very valuable.
> >
> >
> >
> > Thanks, ChienHsing
> >
> >
> >
> > -----Original Message-----
> > From: ChienHsing Wu <ch...@opentext.com>
> > Sent: Tuesday, December 04, 2018 11:43 AM
> > To: dev@kafka.apache.org
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > Consumption Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi,
> >
> >
> >
> > Any comments/updates? I am not sure the next steps if no one has any
> > further comments.
> >
> >
> >
> > Thanks, CH
> >
> >
> >
> > -----Original Message-----
> >
> > From: ChienHsing Wu
> > <ch...@opentext.com>>
> >
> > Sent: Tuesday, November 20, 2018 2:46 PM
> >
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message
> > Consumption Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi Matt,
> >
> >
> >
> > Thanks for the feedback.
> >
> >
> >
> > The issue with the current design is that it stays on the previous
> > partition even if the last poll call consumes the max.poll.records; it
> > will consume all records in that partition available at the consumer
> > side to serve multiple poll calls before moving to the next partition.
> >
> >
> >
> > Introducing another threshold at partition level will decrease the
> > number of records consumed in one partition within one poll call but
> > will still use that same partition as the starting one in the next poll
> call.
> >
> >
> >
> > The same effect can be achieved by setting max.poll.records to 100 I
> > believe. The main difference is that the client will need to make more
> > poll calls when that value is set to 100, and because of the
> > non-blocking nature I believe the cost of extra poll calls are not
> significant.
> >
> >
> >
> > Further thoughts?
> >
> >
> >
> > Thanks, CH
> >
> >
> >
> > -----Original Message-----
> >
> > From: Matt Farmer <ma...@frmr.me>>
> >
> > Sent: Monday, November 19, 2018 9:32 PM
> >
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> > Across Partitions in KafkaConsumer
> >
> >
> >
> > Hi there,
> >
> >
> >
> > Thanks for the KIP.
> >
> >
> >
> > We’ve run into issues with this at Mailchimp so something to address
> > consuming behavior would save us from having to always ensure we’re
> > running enough consumers that each consumer has only one partition
> > (which is our usual MO).
> >
> >
> >
> > I wonder though if it would be simpler and more powerful to define the
> > maximum number of records the consumer should pull from one partition
> > before pulling some records from another?
> >
> >
> >
> > So if you set max.poll.records to 500 and then some new setting,
> > max.poll.records.per.partition, to 100 then the Consumer would switch
> > what partition it reads from every 100 records - looping back around
> > to the first partition that had records if there aren’t 5 or more
> > partitions with records.
> >
> >
> >
> > What do you think?
> >
> >
> >
> > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chienhsw@opentext.com
> > <ma...@opentext.com>> wrote:
> >
> >
> >
> > > Hi, could anyone please review this KIP?
> >
> > >
> >
> > > Thanks, ChienHsing
> >
> > >
> >
> > > From: ChienHsing Wu
> >
> > > Sent: Friday, November 09, 2018 1:10 PM
> >
> > > To: dev@kafka.apache.org<ma...@kafka.apache.org>
> >
> > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> >
> > > Partitions in KafkaConsumer
> >
> > >
> >
> > > Just to check: Will anyone review this? It's been silent for a week...
> >
> > > Thanks, ChienHsing
> >
> > >
> >
> > > From: ChienHsing Wu
> >
> > > Sent: Monday, November 05, 2018 4:18 PM
> >
> > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> >
> > > dev@kafka.apache.org<ma...@kafka.apache.org>>>
> >
> > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across
> > > Partitions
> >
> > > in KafkaConsumer
> >
> > >
> >
> > > Hi I just put together the KIP page as requested. This email is to
> >
> > > start the discussion thread.
> >
> > >
> >
> > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> >
> > > KafkaConsumer<
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > > g_
> >
> > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpti
> > > on
> >
> > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-
> > > XD
> >
> > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8Xi
> > > oE
> >
> > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1b
> > > F5
> >
> > > 49_KU&e=
> >
> > > >
> >
> > > Pull Request:
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac
> > > he
> >
> > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0
> > > OF
> >
> > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK
> > > 7L
> >
> > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> >
> > > Jira:
> >
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.o
> > > rg
> >
> > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wM
> > > rb
> >
> > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Y
> > > rz
> >
> > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> >
> > >
> >
> > > Thanks, CH
> >
> > >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi Mayuresh,

To serve one poll call the logic greedily gets records from one completed fetch before including records from the next completed fetch from the queue, as you described. 

The algorithm remembers the current completed fetch as starting one when serving the next poll call. The net effect is that completed fetch will be retrieved to serve as many poll calls before retrieving records from any other completed fetches. 

For example, let's say the consumer has been assigned partition A, B and C and the max.poll.records is set to 100. Right now we have completed fetch A, and B. Each one has 300 records. It will take 6 poll calls to retrieve all record and the sequence of retrieved partitions will be: A, A, A, B, B, B.

Ideally, it should alternate between A and B. I was proposing to move to the next one fetch for the next poll call based on the order in the completed fetch queue, so the order becomes A, B, A, B, A, B. The implementation parses the completed fetch only once.

Thanks, CH

-----Original Message-----
From: Mayuresh Gharat <gh...@gmail.com> 
Sent: Tuesday, December 11, 2018 1:21 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi ChienHsing,

Thanks for the KIP.
It would be great if you can explain with an example, what you mean by "
Currently the implementation will return available records starting from the last partition the last poll call retrieves records from. This leads to unfair patterns of record consumption from multiple partitions."

KafkaConsumer would send fetch requests to multiple brokers and then gets the corresponding responses and puts them in to a single queue of CompletedFetches. IT then iterates over these completed fetches queue and peels of number of records = max.poll.records from each completedFetch for each poll() before moving on to next completedFetch. Also it does not send a fetch request for a TopicPartition, if we already have a buffered data (completedFetch or nextInlineRecord) for that TopicPartition. It also moves the TopicPartition to the end of the assignment queue, once it has received data from broker for that TopicPartition, to maintain round robin fetch sequence for fairness.

Thanks,

Mayuresh

On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Jason,
>
>
>
> KIP 41 was initiated by you and this KIP is to change the logic 
> discussed in the Ensure Fair Consumption< 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords-23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn-oSZuBYfZ_V2OY5ikbksVMzbt9o&e=>.
> Your input on KIP-387<
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
> -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD
> AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehjaysS
> ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf0kT
> tRlgk&e=>
> would be very valuable.
>
>
>
> Thanks, ChienHsing
>
>
>
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Tuesday, December 04, 2018 11:43 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
>
>
> Hi,
>
>
>
> Any comments/updates? I am not sure the next steps if no one has any 
> further comments.
>
>
>
> Thanks, CH
>
>
>
> -----Original Message-----
>
> From: ChienHsing Wu 
> <ch...@opentext.com>>
>
> Sent: Tuesday, November 20, 2018 2:46 PM
>
> To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
>
>
> Hi Matt,
>
>
>
> Thanks for the feedback.
>
>
>
> The issue with the current design is that it stays on the previous 
> partition even if the last poll call consumes the max.poll.records; it 
> will consume all records in that partition available at the consumer 
> side to serve multiple poll calls before moving to the next partition.
>
>
>
> Introducing another threshold at partition level will decrease the 
> number of records consumed in one partition within one poll call but 
> will still use that same partition as the starting one in the next poll call.
>
>
>
> The same effect can be achieved by setting max.poll.records to 100 I 
> believe. The main difference is that the client will need to make more 
> poll calls when that value is set to 100, and because of the 
> non-blocking nature I believe the cost of extra poll calls are not significant.
>
>
>
> Further thoughts?
>
>
>
> Thanks, CH
>
>
>
> -----Original Message-----
>
> From: Matt Farmer <ma...@frmr.me>>
>
> Sent: Monday, November 19, 2018 9:32 PM
>
> To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
> Across Partitions in KafkaConsumer
>
>
>
> Hi there,
>
>
>
> Thanks for the KIP.
>
>
>
> We’ve run into issues with this at Mailchimp so something to address 
> consuming behavior would save us from having to always ensure we’re 
> running enough consumers that each consumer has only one partition 
> (which is our usual MO).
>
>
>
> I wonder though if it would be simpler and more powerful to define the 
> maximum number of records the consumer should pull from one partition 
> before pulling some records from another?
>
>
>
> So if you set max.poll.records to 500 and then some new setting, 
> max.poll.records.per.partition, to 100 then the Consumer would switch 
> what partition it reads from every 100 records - looping back around 
> to the first partition that had records if there aren’t 5 or more 
> partitions with records.
>
>
>
> What do you think?
>
>
>
> On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chienhsw@opentext.com 
> <ma...@opentext.com>> wrote:
>
>
>
> > Hi, could anyone please review this KIP?
>
> >
>
> > Thanks, ChienHsing
>
> >
>
> > From: ChienHsing Wu
>
> > Sent: Friday, November 09, 2018 1:10 PM
>
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
>
> > Partitions in KafkaConsumer
>
> >
>
> > Just to check: Will anyone review this? It's been silent for a week...
>
> > Thanks, ChienHsing
>
> >
>
> > From: ChienHsing Wu
>
> > Sent: Monday, November 05, 2018 4:18 PM
>
> > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
>
> > dev@kafka.apache.org<ma...@kafka.apache.org>>>
>
> > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > Partitions
>
> > in KafkaConsumer
>
> >
>
> > Hi I just put together the KIP page as requested. This email is to
>
> > start the discussion thread.
>
> >
>
> > KIP: KIP-387: Fair Message Consumption Across Partitions in
>
> > KafkaConsumer<
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
>
> > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpti
> > on
>
> > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-
> > XD
>
> > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8Xi
> > oE
>
> > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1b
> > F5
>
> > 49_KU&e=
>
> > >
>
> > Pull Request:
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac
> > he
>
> > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0
> > OF
>
> > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK
> > 7L
>
> > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
>
> > Jira:
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.o
> > rg
>
> > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wM
> > rb
>
> > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Y
> > rz
>
> > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
>
> >
>
> > Thanks, CH
>
> >
>


--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by Mayuresh Gharat <gh...@gmail.com>.
Hi ChienHsing,

Thanks for the KIP.
It would be great if you can explain with an example, what you mean by "
Currently the implementation will return available records starting from
the last partition the last poll call retrieves records from. This leads to
unfair patterns of record consumption from multiple partitions."

KafkaConsumer would send fetch requests to multiple brokers and then gets
the corresponding responses and puts them in to a single queue of
CompletedFetches. IT then iterates over these completed fetches queue and
peels of number of records = max.poll.records from each completedFetch for
each poll() before moving on to next completedFetch. Also it does not send
a fetch request for a TopicPartition, if we already have a buffered data
(completedFetch or nextInlineRecord) for that TopicPartition. It also moves
the TopicPartition to the end of the assignment queue, once it has received
data from broker for that TopicPartition, to maintain round robin fetch
sequence for fairness.

Thanks,

Mayuresh

On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Jason,
>
>
>
> KIP 41 was initiated by you and this KIP is to change the logic discussed
> in the Ensure Fair Consumption<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-EnsuringFairConsumption>.
> Your input on KIP-387<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-387%3A+Fair+Message+Consumption+Across+Partitions+in+KafkaConsumer>
> would be very valuable.
>
>
>
> Thanks, ChienHsing
>
>
>
> -----Original Message-----
> From: ChienHsing Wu <ch...@opentext.com>
> Sent: Tuesday, December 04, 2018 11:43 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> Across Partitions in KafkaConsumer
>
>
>
> Hi,
>
>
>
> Any comments/updates? I am not sure the next steps if no one has any
> further comments.
>
>
>
> Thanks, CH
>
>
>
> -----Original Message-----
>
> From: ChienHsing Wu <ch...@opentext.com>>
>
> Sent: Tuesday, November 20, 2018 2:46 PM
>
> To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> Across Partitions in KafkaConsumer
>
>
>
> Hi Matt,
>
>
>
> Thanks for the feedback.
>
>
>
> The issue with the current design is that it stays on the previous
> partition even if the last poll call consumes the max.poll.records; it will
> consume all records in that partition available at the consumer side to
> serve multiple poll calls before moving to the next partition.
>
>
>
> Introducing another threshold at partition level will decrease the number
> of records consumed in one partition within one poll call but will still
> use that same partition as the starting one in the next poll call.
>
>
>
> The same effect can be achieved by setting max.poll.records to 100 I
> believe. The main difference is that the client will need to make more poll
> calls when that value is set to 100, and because of the non-blocking nature
> I believe the cost of extra poll calls are not significant.
>
>
>
> Further thoughts?
>
>
>
> Thanks, CH
>
>
>
> -----Original Message-----
>
> From: Matt Farmer <ma...@frmr.me>>
>
> Sent: Monday, November 19, 2018 9:32 PM
>
> To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption
> Across Partitions in KafkaConsumer
>
>
>
> Hi there,
>
>
>
> Thanks for the KIP.
>
>
>
> We’ve run into issues with this at Mailchimp so something to address
> consuming behavior would save us from having to always ensure we’re running
> enough consumers that each consumer has only one partition (which is our
> usual MO).
>
>
>
> I wonder though if it would be simpler and more powerful to define the
> maximum number of records the consumer should pull from one partition
> before pulling some records from another?
>
>
>
> So if you set max.poll.records to 500 and then some new setting,
> max.poll.records.per.partition, to 100 then the Consumer would switch what
> partition it reads from every 100 records - looping back around to the
> first partition that had records if there aren’t 5 or more partitions with
> records.
>
>
>
> What do you think?
>
>
>
> On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <chienhsw@opentext.com
> <ma...@opentext.com>> wrote:
>
>
>
> > Hi, could anyone please review this KIP?
>
> >
>
> > Thanks, ChienHsing
>
> >
>
> > From: ChienHsing Wu
>
> > Sent: Friday, November 09, 2018 1:10 PM
>
> > To: dev@kafka.apache.org<ma...@kafka.apache.org>
>
> > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
>
> > Partitions in KafkaConsumer
>
> >
>
> > Just to check: Will anyone review this? It's been silent for a week...
>
> > Thanks, ChienHsing
>
> >
>
> > From: ChienHsing Wu
>
> > Sent: Monday, November 05, 2018 4:18 PM
>
> > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
>
> > dev@kafka.apache.org<ma...@kafka.apache.org>>>
>
> > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions
>
> > in KafkaConsumer
>
> >
>
> > Hi I just put together the KIP page as requested. This email is to
>
> > start the discussion thread.
>
> >
>
> > KIP: KIP-387: Fair Message Consumption Across Partitions in
>
> > KafkaConsumer<
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
>
> > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
>
> > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD
>
> > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioE
>
> > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1bF5
>
> > 49_KU&e=
>
> > >
>
> > Pull Request:
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache
>
> > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OF
>
> > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7L
>
> > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
>
> > Jira:
>
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
>
> > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
>
> > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Yrz
>
> > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
>
> >
>
> > Thanks, CH
>
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Jason,



KIP 41 was initiated by you and this KIP is to change the logic discussed in the Ensure Fair Consumption<https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-EnsuringFairConsumption>. Your input on KIP-387<https://cwiki.apache.org/confluence/display/KAFKA/KIP-387%3A+Fair+Message+Consumption+Across+Partitions+in+KafkaConsumer> would be very valuable.



Thanks, ChienHsing



-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com>
Sent: Tuesday, December 04, 2018 11:43 AM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer



Hi,



Any comments/updates? I am not sure the next steps if no one has any further comments.



Thanks, CH



-----Original Message-----

From: ChienHsing Wu <ch...@opentext.com>>

Sent: Tuesday, November 20, 2018 2:46 PM

To: dev@kafka.apache.org<ma...@kafka.apache.org>

Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer



Hi Matt,



Thanks for the feedback.



The issue with the current design is that it stays on the previous partition even if the last poll call consumes the max.poll.records; it will consume all records in that partition available at the consumer side to serve multiple poll calls before moving to the next partition.



Introducing another threshold at partition level will decrease the number of records consumed in one partition within one poll call but will still use that same partition as the starting one in the next poll call.



The same effect can be achieved by setting max.poll.records to 100 I believe. The main difference is that the client will need to make more poll calls when that value is set to 100, and because of the non-blocking nature I believe the cost of extra poll calls are not significant.



Further thoughts?



Thanks, CH



-----Original Message-----

From: Matt Farmer <ma...@frmr.me>>

Sent: Monday, November 19, 2018 9:32 PM

To: dev@kafka.apache.org<ma...@kafka.apache.org>

Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer



Hi there,



Thanks for the KIP.



We’ve run into issues with this at Mailchimp so something to address consuming behavior would save us from having to always ensure we’re running enough consumers that each consumer has only one partition (which is our usual MO).



I wonder though if it would be simpler and more powerful to define the maximum number of records the consumer should pull from one partition before pulling some records from another?



So if you set max.poll.records to 500 and then some new setting, max.poll.records.per.partition, to 100 then the Consumer would switch what partition it reads from every 100 records - looping back around to the first partition that had records if there aren’t 5 or more partitions with records.



What do you think?



On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <ch...@opentext.com>> wrote:



> Hi, could anyone please review this KIP?

>

> Thanks, ChienHsing

>

> From: ChienHsing Wu

> Sent: Friday, November 09, 2018 1:10 PM

> To: dev@kafka.apache.org<ma...@kafka.apache.org>

> Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across

> Partitions in KafkaConsumer

>

> Just to check: Will anyone review this? It's been silent for a week...

> Thanks, ChienHsing

>

> From: ChienHsing Wu

> Sent: Monday, November 05, 2018 4:18 PM

> To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:

> dev@kafka.apache.org<ma...@kafka.apache.org>>>

> Subject: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions

> in KafkaConsumer

>

> Hi I just put together the KIP page as requested. This email is to

> start the discussion thread.

>

> KIP: KIP-387: Fair Message Consumption Across Partitions in

> KafkaConsumer<

> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_

> confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption

> -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD

> AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioE

> m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1bF5

> 49_KU&e=

> >

> Pull Request:

> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache

> _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OF

> yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7L

> X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=

> Jira:

> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org

> _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb

> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Yrz

> Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=

>

> Thanks, CH

>

RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Posted by ChienHsing Wu <ch...@opentext.com>.
Hi,

Any comments/updates? I am not sure the next steps if no one has any further comments.

Thanks, CH

-----Original Message-----
From: ChienHsing Wu <ch...@opentext.com> 
Sent: Tuesday, November 20, 2018 2:46 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi Matt,

Thanks for the feedback. 

The issue with the current design is that it stays on the previous partition even if the last poll call consumes the max.poll.records; it will consume all records in that partition available at the consumer side to serve multiple poll calls before moving to the next partition. 

Introducing another threshold at partition level will decrease the number of records consumed in one partition within one poll call but will still use that same partition as the starting one in the next poll call. 

The same effect can be achieved by setting max.poll.records to 100 I believe. The main difference is that the client will need to make more poll calls when that value is set to 100, and because of the non-blocking nature I believe the cost of extra poll calls are not significant. 

Further thoughts?

Thanks, CH

-----Original Message-----
From: Matt Farmer <ma...@frmr.me>
Sent: Monday, November 19, 2018 9:32 PM
To: dev@kafka.apache.org
Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions in KafkaConsumer

Hi there,

Thanks for the KIP.

We’ve run into issues with this at Mailchimp so something to address consuming behavior would save us from having to always ensure we’re running enough consumers that each consumer has only one partition (which is our usual MO).

I wonder though if it would be simpler and more powerful to define the maximum number of records the consumer should pull from one partition before pulling some records from another?

So if you set max.poll.records to 500 and then some new setting, max.poll.records.per.partition, to 100 then the Consumer would switch what partition it reads from every 100 records - looping back around to the first partition that had records if there aren’t 5 or more partitions with records.

What do you think?

On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu <ch...@opentext.com> wrote:

> Hi, could anyone please review this KIP?
>
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Friday, November 09, 2018 1:10 PM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across 
> Partitions in KafkaConsumer
>
> Just to check: Will anyone review this? It's been silent for a week...
> Thanks, ChienHsing
>
> From: ChienHsing Wu
> Sent: Monday, November 05, 2018 4:18 PM
> To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> dev@kafka.apache.org>>
> Subject: [DISCUSS] KIP-387: Fair Message Consumption Across Partitions 
> in KafkaConsumer
>
> Hi I just put together the KIP page as requested. This email is to 
> start the discussion thread.
>
> KIP: KIP-387: Fair Message Consumption Across Partitions in 
> KafkaConsumer< 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumption
> -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1-XD
> AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioE
> m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_lNP1bF5
> 49_KU&e=
> >
> Pull Request: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache
> _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OF
> yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp5tBK7L
> X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> Jira: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org
> _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrb
> L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8Yrz
> Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
>
> Thanks, CH
>