You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Andrew Schofield <an...@live.com> on 2023/05/15 11:55:14 UTC

[DISCUSS] KIP-932: Queues for Kafka

Hi,
I would like to start a discussion thread on KIP-932: Queues for Kafka. This KIP proposes an alternative to consumer groups to enable cooperative consumption by consumers without partition assignment. You end up with queue semantics on top of regular Kafka topics, with per-message acknowledgement and automatic handling of messages which repeatedly fail to be processed.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

Please take a look and let me know what you think.

Thanks.
Andrew

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Ashish Chhabria <as...@gmail.com>.
This is such a great proposal. Parallel/Cooperative consumption is a long
missing pattern in Apache Kafka and it is a viable solution for the common
head-of-line-blocking problem. Developers have long attempted to solve this
with bespoke consumer proxies that state manage the inflight payloads, so
this would definitely be helpful to have in Apache Kafka.

On Mon, 22 May 2023 at 05:09, Andrew Otto <ot...@wikimedia.org> wrote:

> Wow looks very cool.
>
> One Q.
>
> In the Future Work session:
> > The concept can be extended to give key-based ordering so that partial
> ordering and fine-grained sharing can be achieved at the same time.
>
> I think this will be a pretty important feature needed to make shared
> consumer groups useful.  Perhaps it would be worth considering how this
> could be implemented and mentioning it in the KIP?
>
>
>
>
>
>
> On Mon, May 22, 2023 at 6:20 AM Stanislav Kozlovski
> <st...@confluent.io.invalid> wrote:
>
> > Hey Andrew!
> >
> > Kudos on the proposal. It is greatly written - a joy to read. It is
> > definitely an interesting solution to the queueing problem - I would not
> > have guessed we could solve it like this. Thank you for working on this.
> >
> > Happy to get the discussion started - I have a few comments/questions on
> > first read:
> >
> > 1. Tiered Storage
> >
> > I notice no mention of Tiered Storage (KIP-405). Does that complicate the
> > design, especially when fetching historical data? It would be good to
> have
> > at least one sentence mentioning it, even if it doesn't impact it. Right
> > now I'm unsure if it was considered.
> >
> > 2. SSO initialized to the latest offset
> >
> > > "By default, the SSO for each share-partition is initialized to the
> > latest offset for the corresponding topic-partitions."
> >
> > Have we considered allowing this to be configurable to latest/earliest?
> > This would be consistent with the auto.offset.reset config of vanilla
> > consumer groups.
> > Thinking from a user's perspective, it sounds valid to want to start from
> > the start of a topic when starting a share group. Historical processing
> > comes to mind
> >
> > 3. Durable Storage
> >
> > The KIP mentions that "The cluster records this information durably",
> which
> > implies that it saves it somewhere. Does the ShareCoordinator have its
> own
> > topic? Would it be compacted?
> >
> > In particular, I am interested in what such a topic's retention would be
> > like. The vanilla consumer offsets topic has some special retention
> > semantics (KIP-211) where we start counting the retention after the
> > consumer group becomes empty (inactive) - the default being 7 days. Need
> to
> > make sure the retention here isn't too short either, as the offsets topic
> > originally had 24 hours of retention and that proved problematic.
> >
> > In general, some extra detail about the persistence would be greatly
> > appreciated!
> >
> > 4. Batch Acknowledgement
> >
> > > "In the situation where some records in a batch have been released or
> > rejected separately, subsequent fetches of those records are more likely
> to
> > have gaps."
> >
> > Can we expand a bit more on this edge case? I am interested in learning
> > what gets returned on subsequent fetch requests.
> > In particular, - how does this work with compression? As far as I
> remember,
> > we can compress the whole batch there, which might make individual record
> > filtering tricky.
> >
> > 5. Member Management
> >
> > How is consumer group member management handled? I didn't see any
> specific
> > mention - is it the same as a vanilla group?
> > In particular - how will bad consumers be handled?
> >
> > I guess I see two cases:
> > 1. bad consumer that doesn't even heartbeat
> > 2. bad consumer that heartbeats well but for some reason every message
> > processing times out. e.g imagine it was network partitioned from some
> > third-party system that is a critical part of its message processing loop
> >
> > One evident problem I can foresee in production systems is one (or a few)
> > slow consumer applications bringing the SSO/SEO advancement down to a
> > crawl.
> > Imagine an example where the same consumer app always hits the timeout
> > limit - what would the behavior be in such a case? Do we keep that
> consumer
> > app indefinitely (if so, do we run the risk of having it invalidate
> > completely valid messages)? Are there any equivalents to the consumer
> group
> > rebalances which fence off such bad consumers?
> >
> > 6. Processing Semantics (exactly once)
> >
> > > The delivery counts are only maintained approximately and the Acquired
> > state is not persisted.
> >
> > Does this introduce the risk of zombie consumers on
> share-partition-leader
> > failure? i.e restarting and giving another consumer the acquired state
> for
> > the same record
> >
> > I notice that the KIP says:
> > > Finally, this KIP does not include support for acknowledging delivery
> > using transactions for exactly-once semantics.
> > at the very end. It would be helpful to address this earlier in the
> > example, as one of the key points. And it would be good to be clearer on
> > what the processing semantics are. They seem to be *at-least-once* to me.
> >
> >
> > 7. nit: Acronyms
> >
> > I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is
> bound
> > to return weird results.
> > What do we think about the tradeoff of using more-unique acronyms (like
> > SGEO, SSGO) at the expense of one extra letter?
> >
> > Again - thanks for working on this! I think it's a great initiative. I'm
> > excited to see us perfect this proposal and enable a brand new use case
> in
> > Kafka!
> >
> > Best,
> > Stanislav
> >
> >
> >
> > On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <
> > andrew_schofield@live.com>
> > wrote:
> >
> > > Hi,
> > > I would like to start a discussion thread on KIP-932: Queues for Kafka.
> > > This KIP proposes an alternative to consumer groups to enable
> cooperative
> > > consumption by consumers without partition assignment. You end up with
> > > queue semantics on top of regular Kafka topics, with per-message
> > > acknowledgement and automatic handling of messages which repeatedly
> fail
> > to
> > > be processed.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> > >
> > > Please take a look and let me know what you think.
> > >
> > > Thanks.
> > > Andrew
> >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Andrew Otto <ot...@wikimedia.org>.
Wow looks very cool.

One Q.

In the Future Work session:
> The concept can be extended to give key-based ordering so that partial
ordering and fine-grained sharing can be achieved at the same time.

I think this will be a pretty important feature needed to make shared
consumer groups useful.  Perhaps it would be worth considering how this
could be implemented and mentioning it in the KIP?






On Mon, May 22, 2023 at 6:20 AM Stanislav Kozlovski
<st...@confluent.io.invalid> wrote:

> Hey Andrew!
>
> Kudos on the proposal. It is greatly written - a joy to read. It is
> definitely an interesting solution to the queueing problem - I would not
> have guessed we could solve it like this. Thank you for working on this.
>
> Happy to get the discussion started - I have a few comments/questions on
> first read:
>
> 1. Tiered Storage
>
> I notice no mention of Tiered Storage (KIP-405). Does that complicate the
> design, especially when fetching historical data? It would be good to have
> at least one sentence mentioning it, even if it doesn't impact it. Right
> now I'm unsure if it was considered.
>
> 2. SSO initialized to the latest offset
>
> > "By default, the SSO for each share-partition is initialized to the
> latest offset for the corresponding topic-partitions."
>
> Have we considered allowing this to be configurable to latest/earliest?
> This would be consistent with the auto.offset.reset config of vanilla
> consumer groups.
> Thinking from a user's perspective, it sounds valid to want to start from
> the start of a topic when starting a share group. Historical processing
> comes to mind
>
> 3. Durable Storage
>
> The KIP mentions that "The cluster records this information durably", which
> implies that it saves it somewhere. Does the ShareCoordinator have its own
> topic? Would it be compacted?
>
> In particular, I am interested in what such a topic's retention would be
> like. The vanilla consumer offsets topic has some special retention
> semantics (KIP-211) where we start counting the retention after the
> consumer group becomes empty (inactive) - the default being 7 days. Need to
> make sure the retention here isn't too short either, as the offsets topic
> originally had 24 hours of retention and that proved problematic.
>
> In general, some extra detail about the persistence would be greatly
> appreciated!
>
> 4. Batch Acknowledgement
>
> > "In the situation where some records in a batch have been released or
> rejected separately, subsequent fetches of those records are more likely to
> have gaps."
>
> Can we expand a bit more on this edge case? I am interested in learning
> what gets returned on subsequent fetch requests.
> In particular, - how does this work with compression? As far as I remember,
> we can compress the whole batch there, which might make individual record
> filtering tricky.
>
> 5. Member Management
>
> How is consumer group member management handled? I didn't see any specific
> mention - is it the same as a vanilla group?
> In particular - how will bad consumers be handled?
>
> I guess I see two cases:
> 1. bad consumer that doesn't even heartbeat
> 2. bad consumer that heartbeats well but for some reason every message
> processing times out. e.g imagine it was network partitioned from some
> third-party system that is a critical part of its message processing loop
>
> One evident problem I can foresee in production systems is one (or a few)
> slow consumer applications bringing the SSO/SEO advancement down to a
> crawl.
> Imagine an example where the same consumer app always hits the timeout
> limit - what would the behavior be in such a case? Do we keep that consumer
> app indefinitely (if so, do we run the risk of having it invalidate
> completely valid messages)? Are there any equivalents to the consumer group
> rebalances which fence off such bad consumers?
>
> 6. Processing Semantics (exactly once)
>
> > The delivery counts are only maintained approximately and the Acquired
> state is not persisted.
>
> Does this introduce the risk of zombie consumers on share-partition-leader
> failure? i.e restarting and giving another consumer the acquired state for
> the same record
>
> I notice that the KIP says:
> > Finally, this KIP does not include support for acknowledging delivery
> using transactions for exactly-once semantics.
> at the very end. It would be helpful to address this earlier in the
> example, as one of the key points. And it would be good to be clearer on
> what the processing semantics are. They seem to be *at-least-once* to me.
>
>
> 7. nit: Acronyms
>
> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
> to return weird results.
> What do we think about the tradeoff of using more-unique acronyms (like
> SGEO, SSGO) at the expense of one extra letter?
>
> Again - thanks for working on this! I think it's a great initiative. I'm
> excited to see us perfect this proposal and enable a brand new use case in
> Kafka!
>
> Best,
> Stanislav
>
>
>
> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <
> andrew_schofield@live.com>
> wrote:
>
> > Hi,
> > I would like to start a discussion thread on KIP-932: Queues for Kafka.
> > This KIP proposes an alternative to consumer groups to enable cooperative
> > consumption by consumers without partition assignment. You end up with
> > queue semantics on top of regular Kafka topics, with per-message
> > acknowledgement and automatic handling of messages which repeatedly fail
> to
> > be processed.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> >
> > Please take a look and let me know what you think.
> >
> > Thanks.
> > Andrew
>
>
>
> --
> Best,
> Stanislav
>

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Andrew Schofield <an...@live.com>.
Hi Satish,
Thanks for your comments.

101 I am not planning to extend this functionality for queuing semantics like the JMS
point-to-point model in future. While this KIP does make it more viable to create a
relatively thin JMS client that talks Kafka protocol directly, there’s a lot more to achieve
full JMS support. If there’s a specific JMS concept that you had in mind, I’m happy to
discuss more.

102 Before DLQs are introduced, there is no way to know which records were
archived as undeliverable because of reaching the limit of delivery attempts or
explicit rejection. I think DLQ is the most important follow-on to this KIP.

103 I agree that the language around SPSO and SPEO could be improved and stating
what the maximum values are would be helpful. I’ll do that in the next revision.

104 I understand your point but I think I would express it differently. The durable record
of the delivery count could perhaps not be incremented in the event of a broker crash or
other failure. The point of the delivery count is not to guarantee exactly 5 delivery attempts.
It’s to make sure that you don’t get an unlimited number of futile attempts for an
unprocessable record. If you get very rarely get an additional attempt because there was
a failure to update the durable record of the delivery count, that’s still meeting the overall
aim.

Thanks,
Andrew

> On 29 May 2023, at 05:36, Satish Duggana <sa...@gmail.com> wrote:
>
> Hi Andrew,
> Thanks for the nice KIP on a very interesting feature about
> introducing some of the traditional MessageQueue semantics to Kafka.
> It is good to see that we are extending the existing consumer groups
> concepts and related mechanisms for shared subscriptions instead of
> bringing any large architectural/protocol changes.
>
> This KIP talks about introducing a durable subscription feature for
> topics with multiple consumers consuming messages parallely from a
> single topic partition.
>
> 101 Are you planning to extend this functionality for queueing
> semantics like JMS point to point style in future?
>
> 102 When a message is rejected by the target consumer, how do users
> know what records/offsets are dropped because of the failed records
> due to rejection ack or due to timeouts etc before DLQs are
> introduced?
>
> 103 It talks about SPSO values, earliest being the default and user
> can reset it to a target offset timestamp. What is the maximum value
> for SPEO? It is good to clarify what could be the maximum value for
> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>
> 104 KIP mentions that "share.delivery.count.limit" as the maximum
> number of delivery attempts for a record delivered to a share group.
> But the actual delivery count may be more than this number as the
> leader may fail updating the delivery count as leader or consumer may
> fail and more delivery attempts may be made later. It may be the
> minimum number of delivery attempts instead of the maximum delivery
> attempts.
>
> Thanks,
> Satish.
>
>
> On Wed, 24 May 2023 at 21:26, Andrew Schofield
> <an...@outlook.com> wrote:
>>
>> Hi Stanislav,
>> Thanks for your email. You bring up some interesting points.
>>
>> 1) Tiered storage
>> I think the situation here for fetching historical data is equivalent to what happens if a user resets the committed offset for a consumer
>> group back to an earlier point in time. So, I will mention this in the next update to the KIP document but I think there's nothing
>> especially different here.
>>
>> 2) SSO initialized to the latest offset
>> The KIP does mention that it is possible for an administrator to set the SSO using either AdminClient.alterShareGroupOffsets or
>> kafka-share-groups.sh. It is entirely intentional that there is no KafkaConsumer config for initializing the SSO. I know that's how it
>> can be done for consumer groups, but it suffers from the situation where different consumers have different opinions about
>> the initial value (earliest vs latest) and then the first one in wins. Also, KIP-842 digs into some problems with how consumer
>> group offset reset works (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) so
>> I've tried to sidestep those problems too.
>>
>> Another possibility is to follow KIP-848 which proposes that AdminClient.incrementalAlterConfigs is enhanced to support a new
>> resource type called GROUP and supporting a dynamic group config in this manner would give a single point of control.
>>
>> 3) Durable storage
>> The KIP does not yet describe how durable storage works. I have a few ideas that I want to flesh out before updating the KIP.
>>
>> I will rule out using a compacted topic though. The problem is that each record on a compacted topic is a key:value pair, and
>> it's not obvious what to use as the key. If it's the share group name, it needs the entire in-flight record state to be recorded in
>> one hit which is extremely inefficient.
>>
>> 4) Batch acknowledgement
>> You are correct that compression makes delivery and acknowledgement of individual messages within a compressed batch
>> more complicated. Again, I will defer a proper answer here until I've dug more deeply.
>>
>> 5) Member management
>> Member management will be similar to consumer groups. I anticipate that it will build entirely on the new consumer group
>> protocol in KIP-848. There seems little benefit in supporting the legacy protocol when this KIP is targeting versions of Kafka
>> which will all have KIP-848.
>>
>> The two cases you mention:
>> i) If a bad consumer doesn't even heartbeat, it will be ejected from the group. This does not involve a rebalance.
>> ii) If a bad consumer heartbeats but always times out message processing, it will slow the advancement of the SSO/SEO. There
>> is the possibility that such a consumer would invalidate completely valid messages. In order to do this, it would have to acquire
>> the same set of message repeatedly, to the exclusion of other consumers, and thus bump the delivery count to the limit.
>> This is unlikely but not impossible.
>>
>> 6) Processing semantics
>> Delivery is at-least-once.
>>
>> 7) Acronyms
>> I hadn't thought about the implications of "Kafka SEO". I think I'll change it to "Share Partition Start Offset" (SPSO) and
>> "Share Partition End Offset" (SPEO).
>>
>> There is a lot of work ahead for this KIP. I intend to work on the protocol changes next.
>>
>> Thanks for getting involved in the discussion.
>> Andrew
>>
>> From: Stanislav Kozlovski <st...@confluent.io.INVALID>
>> Sent: 22 May 2023 11:20
>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>> Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
>>
>> Hey Andrew!
>>
>> Kudos on the proposal. It is greatly written - a joy to read. It is
>> definitely an interesting solution to the queueing problem - I would not
>> have guessed we could solve it like this. Thank you for working on this.
>>
>> Happy to get the discussion started - I have a few comments/questions on
>> first read:
>>
>> 1. Tiered Storage
>>
>> I notice no mention of Tiered Storage (KIP-405). Does that complicate the
>> design, especially when fetching historical data? It would be good to have
>> at least one sentence mentioning it, even if it doesn't impact it. Right
>> now I'm unsure if it was considered.
>>
>> 2. SSO initialized to the latest offset
>>
>>> "By default, the SSO for each share-partition is initialized to the
>> latest offset for the corresponding topic-partitions."
>>
>> Have we considered allowing this to be configurable to latest/earliest?
>> This would be consistent with the auto.offset.reset config of vanilla
>> consumer groups.
>> Thinking from a user's perspective, it sounds valid to want to start from
>> the start of a topic when starting a share group. Historical processing
>> comes to mind
>>
>> 3. Durable Storage
>>
>> The KIP mentions that "The cluster records this information durably", which
>> implies that it saves it somewhere. Does the ShareCoordinator have its own
>> topic? Would it be compacted?
>>
>> In particular, I am interested in what such a topic's retention would be
>> like. The vanilla consumer offsets topic has some special retention
>> semantics (KIP-211) where we start counting the retention after the
>> consumer group becomes empty (inactive) - the default being 7 days. Need to
>> make sure the retention here isn't too short either, as the offsets topic
>> originally had 24 hours of retention and that proved problematic.
>>
>> In general, some extra detail about the persistence would be greatly
>> appreciated!
>>
>> 4. Batch Acknowledgement
>>
>>> "In the situation where some records in a batch have been released or
>> rejected separately, subsequent fetches of those records are more likely to
>> have gaps."
>>
>> Can we expand a bit more on this edge case? I am interested in learning
>> what gets returned on subsequent fetch requests.
>> In particular, - how does this work with compression? As far as I remember,
>> we can compress the whole batch there, which might make individual record
>> filtering tricky.
>>
>> 5. Member Management
>>
>> How is consumer group member management handled? I didn't see any specific
>> mention - is it the same as a vanilla group?
>> In particular - how will bad consumers be handled?
>>
>> I guess I see two cases:
>> 1. bad consumer that doesn't even heartbeat
>> 2. bad consumer that heartbeats well but for some reason every message
>> processing times out. e.g imagine it was network partitioned from some
>> third-party system that is a critical part of its message processing loop
>>
>> One evident problem I can foresee in production systems is one (or a few)
>> slow consumer applications bringing the SSO/SEO advancement down to a crawl.
>> Imagine an example where the same consumer app always hits the timeout
>> limit - what would the behavior be in such a case? Do we keep that consumer
>> app indefinitely (if so, do we run the risk of having it invalidate
>> completely valid messages)? Are there any equivalents to the consumer group
>> rebalances which fence off such bad consumers?
>>
>> 6. Processing Semantics (exactly once)
>>
>>> The delivery counts are only maintained approximately and the Acquired
>> state is not persisted.
>>
>> Does this introduce the risk of zombie consumers on share-partition-leader
>> failure? i.e restarting and giving another consumer the acquired state for
>> the same record
>>
>> I notice that the KIP says:
>>> Finally, this KIP does not include support for acknowledging delivery
>> using transactions for exactly-once semantics.
>> at the very end. It would be helpful to address this earlier in the
>> example, as one of the key points. And it would be good to be clearer on
>> what the processing semantics are. They seem to be *at-least-once* to me.
>>
>>
>> 7. nit: Acronyms
>>
>> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
>> to return weird results.
>> What do we think about the tradeoff of using more-unique acronyms (like
>> SGEO, SSGO) at the expense of one extra letter?
>>
>> Again - thanks for working on this! I think it's a great initiative. I'm
>> excited to see us perfect this proposal and enable a brand new use case in
>> Kafka!
>>
>> Best,
>> Stanislav
>>
>>
>>
>> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <an...@live.com>
>> wrote:
>>
>>> Hi,
>>> I would like to start a discussion thread on KIP-932: Queues for Kafka.
>>> This KIP proposes an alternative to consumer groups to enable cooperative
>>> consumption by consumers without partition assignment. You end up with
>>> queue semantics on top of regular Kafka topics, with per-message
>>> acknowledgement and automatic handling of messages which repeatedly fail to
>>> be processed.
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>
>>> Please take a look and let me know what you think.
>>>
>>> Thanks.
>>> Andrew
>>
>>
>>
>> --
>> Best,
>> Stanislav



Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Andrew Schofield <an...@live.com>.
Hi Luke,
Thanks for your comments.

1) I expect that fetch-from-follower will not be supported for share groups. If you think about it,
FFF gives freedom to fetch records from a nearby broker, but it does not also give the
ability to commit offsets to a nearby broker. For a share-group, the share-partition leader is
intimately involved in which records are fetched and acknowledged.

2) I have two big areas to fill in with the KIP - the RPCs and the storage. I’m working on the
RPCs now. The storage will be next.

3) Yes, we need new metrics. I’ll put a placeholder in the next update to the KIP. I think it will
be easy to enumerate them once the proposal has stabilised.

Thanks,
Andrew


> On 29 May 2023, at 10:04, Luke Chen <sh...@gmail.com> wrote:
>
> Hi Andrew,
>
> Thanks for the KIP.
> Some high level questions:
> 1. How do we handle "fetch from follower" case?
> It looks like in current design, each call needs to go to "shared partition
> leader", where the shared state stored. Is my understanding correct?
>
> 2. Where does the state info stored?
> It looks like we only store them in the memory of "shared partition
> leader". What happened after the leader crashed and move to other ISR
> replica?
>
> 3. New metrics needed
> Since we're introducing a new kind of consumer group, I think there should
> be new metrics added for client and broker to monitor them.
>
> Thank you.
> Luke
>
> On Mon, May 29, 2023 at 1:01 PM Satish Duggana <sa...@gmail.com>
> wrote:
>
>> Minor correction on 103, latest instead of earliest for SPSO default value.
>>
>> 103 It talks about SPSO values, latest being the default and user
>> can reset it to a target offset timestamp. What is the maximum value
>> for SPEO? It is good to clarify what could be the maximum value for
>> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>>
>> Thanks,
>> Satish.
>>
>> On Mon, 29 May 2023 at 10:06, Satish Duggana <sa...@gmail.com>
>> wrote:
>>>
>>> Hi Andrew,
>>> Thanks for the nice KIP on a very interesting feature about
>>> introducing some of the traditional MessageQueue semantics to Kafka.
>>> It is good to see that we are extending the existing consumer groups
>>> concepts and related mechanisms for shared subscriptions instead of
>>> bringing any large architectural/protocol changes.
>>>
>>> This KIP talks about introducing a durable subscription feature for
>>> topics with multiple consumers consuming messages parallely from a
>>> single topic partition.
>>>
>>> 101 Are you planning to extend this functionality for queueing
>>> semantics like JMS point to point style in future?
>>>
>>> 102 When a message is rejected by the target consumer, how do users
>>> know what records/offsets are dropped because of the failed records
>>> due to rejection ack or due to timeouts etc before DLQs are
>>> introduced?
>>>
>>> 103 It talks about SPSO values, earliest being the default and user
>>> can reset it to a target offset timestamp. What is the maximum value
>>> for SPEO? It is good to clarify what could be the maximum value for
>>> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>>>
>>> 104 KIP mentions that "share.delivery.count.limit" as the maximum
>>> number of delivery attempts for a record delivered to a share group.
>>> But the actual delivery count may be more than this number as the
>>> leader may fail updating the delivery count as leader or consumer may
>>> fail and more delivery attempts may be made later. It may be the
>>> minimum number of delivery attempts instead of the maximum delivery
>>> attempts.
>>>
>>> Thanks,
>>> Satish.
>>>
>>>
>>> On Wed, 24 May 2023 at 21:26, Andrew Schofield
>>> <an...@outlook.com> wrote:
>>>>
>>>> Hi Stanislav,
>>>> Thanks for your email. You bring up some interesting points.
>>>>
>>>> 1) Tiered storage
>>>> I think the situation here for fetching historical data is equivalent
>> to what happens if a user resets the committed offset for a consumer
>>>> group back to an earlier point in time. So, I will mention this in the
>> next update to the KIP document but I think there's nothing
>>>> especially different here.
>>>>
>>>> 2) SSO initialized to the latest offset
>>>> The KIP does mention that it is possible for an administrator to set
>> the SSO using either AdminClient.alterShareGroupOffsets or
>>>> kafka-share-groups.sh. It is entirely intentional that there is no
>> KafkaConsumer config for initializing the SSO. I know that's how it
>>>> can be done for consumer groups, but it suffers from the situation
>> where different consumers have different opinions about
>>>> the initial value (earliest vs latest) and then the first one in wins.
>> Also, KIP-842 digs into some problems with how consumer
>>>> group offset reset works (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
>> so
>>>> I've tried to sidestep those problems too.
>>>>
>>>> Another possibility is to follow KIP-848 which proposes that
>> AdminClient.incrementalAlterConfigs is enhanced to support a new
>>>> resource type called GROUP and supporting a dynamic group config in
>> this manner would give a single point of control.
>>>>
>>>> 3) Durable storage
>>>> The KIP does not yet describe how durable storage works. I have a few
>> ideas that I want to flesh out before updating the KIP.
>>>>
>>>> I will rule out using a compacted topic though. The problem is that
>> each record on a compacted topic is a key:value pair, and
>>>> it's not obvious what to use as the key. If it's the share group name,
>> it needs the entire in-flight record state to be recorded in
>>>> one hit which is extremely inefficient.
>>>>
>>>> 4) Batch acknowledgement
>>>> You are correct that compression makes delivery and acknowledgement of
>> individual messages within a compressed batch
>>>> more complicated. Again, I will defer a proper answer here until I've
>> dug more deeply.
>>>>
>>>> 5) Member management
>>>> Member management will be similar to consumer groups. I anticipate
>> that it will build entirely on the new consumer group
>>>> protocol in KIP-848. There seems little benefit in supporting the
>> legacy protocol when this KIP is targeting versions of Kafka
>>>> which will all have KIP-848.
>>>>
>>>> The two cases you mention:
>>>> i) If a bad consumer doesn't even heartbeat, it will be ejected from
>> the group. This does not involve a rebalance.
>>>> ii) If a bad consumer heartbeats but always times out message
>> processing, it will slow the advancement of the SSO/SEO. There
>>>> is the possibility that such a consumer would invalidate completely
>> valid messages. In order to do this, it would have to acquire
>>>> the same set of message repeatedly, to the exclusion of other
>> consumers, and thus bump the delivery count to the limit.
>>>> This is unlikely but not impossible.
>>>>
>>>> 6) Processing semantics
>>>> Delivery is at-least-once.
>>>>
>>>> 7) Acronyms
>>>> I hadn't thought about the implications of "Kafka SEO". I think I'll
>> change it to "Share Partition Start Offset" (SPSO) and
>>>> "Share Partition End Offset" (SPEO).
>>>>
>>>> There is a lot of work ahead for this KIP. I intend to work on the
>> protocol changes next.
>>>>
>>>> Thanks for getting involved in the discussion.
>>>> Andrew
>>>>
>>>> From: Stanislav Kozlovski <st...@confluent.io.INVALID>
>>>> Sent: 22 May 2023 11:20
>>>> To: dev@kafka.apache.org <de...@kafka.apache.org>
>>>> Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
>>>>
>>>> Hey Andrew!
>>>>
>>>> Kudos on the proposal. It is greatly written - a joy to read. It is
>>>> definitely an interesting solution to the queueing problem - I would
>> not
>>>> have guessed we could solve it like this. Thank you for working on
>> this.
>>>>
>>>> Happy to get the discussion started - I have a few comments/questions
>> on
>>>> first read:
>>>>
>>>> 1. Tiered Storage
>>>>
>>>> I notice no mention of Tiered Storage (KIP-405). Does that complicate
>> the
>>>> design, especially when fetching historical data? It would be good to
>> have
>>>> at least one sentence mentioning it, even if it doesn't impact it.
>> Right
>>>> now I'm unsure if it was considered.
>>>>
>>>> 2. SSO initialized to the latest offset
>>>>
>>>>> "By default, the SSO for each share-partition is initialized to the
>>>> latest offset for the corresponding topic-partitions."
>>>>
>>>> Have we considered allowing this to be configurable to latest/earliest?
>>>> This would be consistent with the auto.offset.reset config of vanilla
>>>> consumer groups.
>>>> Thinking from a user's perspective, it sounds valid to want to start
>> from
>>>> the start of a topic when starting a share group. Historical processing
>>>> comes to mind
>>>>
>>>> 3. Durable Storage
>>>>
>>>> The KIP mentions that "The cluster records this information durably",
>> which
>>>> implies that it saves it somewhere. Does the ShareCoordinator have its
>> own
>>>> topic? Would it be compacted?
>>>>
>>>> In particular, I am interested in what such a topic's retention would
>> be
>>>> like. The vanilla consumer offsets topic has some special retention
>>>> semantics (KIP-211) where we start counting the retention after the
>>>> consumer group becomes empty (inactive) - the default being 7 days.
>> Need to
>>>> make sure the retention here isn't too short either, as the offsets
>> topic
>>>> originally had 24 hours of retention and that proved problematic.
>>>>
>>>> In general, some extra detail about the persistence would be greatly
>>>> appreciated!
>>>>
>>>> 4. Batch Acknowledgement
>>>>
>>>>> "In the situation where some records in a batch have been released or
>>>> rejected separately, subsequent fetches of those records are more
>> likely to
>>>> have gaps."
>>>>
>>>> Can we expand a bit more on this edge case? I am interested in learning
>>>> what gets returned on subsequent fetch requests.
>>>> In particular, - how does this work with compression? As far as I
>> remember,
>>>> we can compress the whole batch there, which might make individual
>> record
>>>> filtering tricky.
>>>>
>>>> 5. Member Management
>>>>
>>>> How is consumer group member management handled? I didn't see any
>> specific
>>>> mention - is it the same as a vanilla group?
>>>> In particular - how will bad consumers be handled?
>>>>
>>>> I guess I see two cases:
>>>> 1. bad consumer that doesn't even heartbeat
>>>> 2. bad consumer that heartbeats well but for some reason every message
>>>> processing times out. e.g imagine it was network partitioned from some
>>>> third-party system that is a critical part of its message processing
>> loop
>>>>
>>>> One evident problem I can foresee in production systems is one (or a
>> few)
>>>> slow consumer applications bringing the SSO/SEO advancement down to a
>> crawl.
>>>> Imagine an example where the same consumer app always hits the timeout
>>>> limit - what would the behavior be in such a case? Do we keep that
>> consumer
>>>> app indefinitely (if so, do we run the risk of having it invalidate
>>>> completely valid messages)? Are there any equivalents to the consumer
>> group
>>>> rebalances which fence off such bad consumers?
>>>>
>>>> 6. Processing Semantics (exactly once)
>>>>
>>>>> The delivery counts are only maintained approximately and the
>> Acquired
>>>> state is not persisted.
>>>>
>>>> Does this introduce the risk of zombie consumers on
>> share-partition-leader
>>>> failure? i.e restarting and giving another consumer the acquired state
>> for
>>>> the same record
>>>>
>>>> I notice that the KIP says:
>>>>> Finally, this KIP does not include support for acknowledging delivery
>>>> using transactions for exactly-once semantics.
>>>> at the very end. It would be helpful to address this earlier in the
>>>> example, as one of the key points. And it would be good to be clearer
>> on
>>>> what the processing semantics are. They seem to be *at-least-once* to
>> me.
>>>>
>>>>
>>>> 7. nit: Acronyms
>>>>
>>>> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is
>> bound
>>>> to return weird results.
>>>> What do we think about the tradeoff of using more-unique acronyms (like
>>>> SGEO, SSGO) at the expense of one extra letter?
>>>>
>>>> Again - thanks for working on this! I think it's a great initiative.
>> I'm
>>>> excited to see us perfect this proposal and enable a brand new use
>> case in
>>>> Kafka!
>>>>
>>>> Best,
>>>> Stanislav
>>>>
>>>>
>>>>
>>>> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <
>> andrew_schofield@live.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I would like to start a discussion thread on KIP-932: Queues for
>> Kafka.
>>>>> This KIP proposes an alternative to consumer groups to enable
>> cooperative
>>>>> consumption by consumers without partition assignment. You end up
>> with
>>>>> queue semantics on top of regular Kafka topics, with per-message
>>>>> acknowledgement and automatic handling of messages which repeatedly
>> fail to
>>>>> be processed.
>>>>>
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>
>>>>> Please take a look and let me know what you think.
>>>>>
>>>>> Thanks.
>>>>> Andrew
>>>>
>>>>
>>>>
>>>> --
>>>> Best,
>>>> Stanislav



Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Luke Chen <sh...@gmail.com>.
Hi Andrew,

Thanks for the KIP.
Some high level questions:
1. How do we handle "fetch from follower" case?
It looks like in current design, each call needs to go to "shared partition
leader", where the shared state stored. Is my understanding correct?

2. Where does the state info stored?
It looks like we only store them in the memory of "shared partition
leader". What happened after the leader crashed and move to other ISR
replica?

3. New metrics needed
Since we're introducing a new kind of consumer group, I think there should
be new metrics added for client and broker to monitor them.

Thank you.
Luke

On Mon, May 29, 2023 at 1:01 PM Satish Duggana <sa...@gmail.com>
wrote:

> Minor correction on 103, latest instead of earliest for SPSO default value.
>
> 103 It talks about SPSO values, latest being the default and user
> can reset it to a target offset timestamp. What is the maximum value
> for SPEO? It is good to clarify what could be the maximum value for
> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>
> Thanks,
> Satish.
>
> On Mon, 29 May 2023 at 10:06, Satish Duggana <sa...@gmail.com>
> wrote:
> >
> > Hi Andrew,
> > Thanks for the nice KIP on a very interesting feature about
> > introducing some of the traditional MessageQueue semantics to Kafka.
> > It is good to see that we are extending the existing consumer groups
> > concepts and related mechanisms for shared subscriptions instead of
> > bringing any large architectural/protocol changes.
> >
> > This KIP talks about introducing a durable subscription feature for
> > topics with multiple consumers consuming messages parallely from a
> > single topic partition.
> >
> > 101 Are you planning to extend this functionality for queueing
> > semantics like JMS point to point style in future?
> >
> > 102 When a message is rejected by the target consumer, how do users
> > know what records/offsets are dropped because of the failed records
> > due to rejection ack or due to timeouts etc before DLQs are
> > introduced?
> >
> > 103 It talks about SPSO values, earliest being the default and user
> > can reset it to a target offset timestamp. What is the maximum value
> > for SPEO? It is good to clarify what could be the maximum value for
> > SPSO and SPEO. It can be HW or LogStableOffset or some other value?
> >
> > 104 KIP mentions that "share.delivery.count.limit" as the maximum
> > number of delivery attempts for a record delivered to a share group.
> > But the actual delivery count may be more than this number as the
> > leader may fail updating the delivery count as leader or consumer may
> > fail and more delivery attempts may be made later. It may be the
> > minimum number of delivery attempts instead of the maximum delivery
> > attempts.
> >
> > Thanks,
> > Satish.
> >
> >
> > On Wed, 24 May 2023 at 21:26, Andrew Schofield
> > <an...@outlook.com> wrote:
> > >
> > > Hi Stanislav,
> > > Thanks for your email. You bring up some interesting points.
> > >
> > > 1) Tiered storage
> > > I think the situation here for fetching historical data is equivalent
> to what happens if a user resets the committed offset for a consumer
> > > group back to an earlier point in time. So, I will mention this in the
> next update to the KIP document but I think there's nothing
> > > especially different here.
> > >
> > > 2) SSO initialized to the latest offset
> > > The KIP does mention that it is possible for an administrator to set
> the SSO using either AdminClient.alterShareGroupOffsets or
> > > kafka-share-groups.sh. It is entirely intentional that there is no
> KafkaConsumer config for initializing the SSO. I know that's how it
> > > can be done for consumer groups, but it suffers from the situation
> where different consumers have different opinions about
> > > the initial value (earliest vs latest) and then the first one in wins.
> Also, KIP-842 digs into some problems with how consumer
> > > group offset reset works (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
> so
> > > I've tried to sidestep those problems too.
> > >
> > > Another possibility is to follow KIP-848 which proposes that
> AdminClient.incrementalAlterConfigs is enhanced to support a new
> > > resource type called GROUP and supporting a dynamic group config in
> this manner would give a single point of control.
> > >
> > > 3) Durable storage
> > > The KIP does not yet describe how durable storage works. I have a few
> ideas that I want to flesh out before updating the KIP.
> > >
> > > I will rule out using a compacted topic though. The problem is that
> each record on a compacted topic is a key:value pair, and
> > > it's not obvious what to use as the key. If it's the share group name,
> it needs the entire in-flight record state to be recorded in
> > > one hit which is extremely inefficient.
> > >
> > > 4) Batch acknowledgement
> > > You are correct that compression makes delivery and acknowledgement of
> individual messages within a compressed batch
> > > more complicated. Again, I will defer a proper answer here until I've
> dug more deeply.
> > >
> > > 5) Member management
> > > Member management will be similar to consumer groups. I anticipate
> that it will build entirely on the new consumer group
> > > protocol in KIP-848. There seems little benefit in supporting the
> legacy protocol when this KIP is targeting versions of Kafka
> > > which will all have KIP-848.
> > >
> > > The two cases you mention:
> > > i) If a bad consumer doesn't even heartbeat, it will be ejected from
> the group. This does not involve a rebalance.
> > > ii) If a bad consumer heartbeats but always times out message
> processing, it will slow the advancement of the SSO/SEO. There
> > > is the possibility that such a consumer would invalidate completely
> valid messages. In order to do this, it would have to acquire
> > > the same set of message repeatedly, to the exclusion of other
> consumers, and thus bump the delivery count to the limit.
> > > This is unlikely but not impossible.
> > >
> > > 6) Processing semantics
> > > Delivery is at-least-once.
> > >
> > > 7) Acronyms
> > > I hadn't thought about the implications of "Kafka SEO". I think I'll
> change it to "Share Partition Start Offset" (SPSO) and
> > > "Share Partition End Offset" (SPEO).
> > >
> > > There is a lot of work ahead for this KIP. I intend to work on the
> protocol changes next.
> > >
> > > Thanks for getting involved in the discussion.
> > > Andrew
> > >
> > > From: Stanislav Kozlovski <st...@confluent.io.INVALID>
> > > Sent: 22 May 2023 11:20
> > > To: dev@kafka.apache.org <de...@kafka.apache.org>
> > > Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
> > >
> > > Hey Andrew!
> > >
> > > Kudos on the proposal. It is greatly written - a joy to read. It is
> > > definitely an interesting solution to the queueing problem - I would
> not
> > > have guessed we could solve it like this. Thank you for working on
> this.
> > >
> > > Happy to get the discussion started - I have a few comments/questions
> on
> > > first read:
> > >
> > > 1. Tiered Storage
> > >
> > > I notice no mention of Tiered Storage (KIP-405). Does that complicate
> the
> > > design, especially when fetching historical data? It would be good to
> have
> > > at least one sentence mentioning it, even if it doesn't impact it.
> Right
> > > now I'm unsure if it was considered.
> > >
> > > 2. SSO initialized to the latest offset
> > >
> > > > "By default, the SSO for each share-partition is initialized to the
> > > latest offset for the corresponding topic-partitions."
> > >
> > > Have we considered allowing this to be configurable to latest/earliest?
> > > This would be consistent with the auto.offset.reset config of vanilla
> > > consumer groups.
> > > Thinking from a user's perspective, it sounds valid to want to start
> from
> > > the start of a topic when starting a share group. Historical processing
> > > comes to mind
> > >
> > > 3. Durable Storage
> > >
> > > The KIP mentions that "The cluster records this information durably",
> which
> > > implies that it saves it somewhere. Does the ShareCoordinator have its
> own
> > > topic? Would it be compacted?
> > >
> > > In particular, I am interested in what such a topic's retention would
> be
> > > like. The vanilla consumer offsets topic has some special retention
> > > semantics (KIP-211) where we start counting the retention after the
> > > consumer group becomes empty (inactive) - the default being 7 days.
> Need to
> > > make sure the retention here isn't too short either, as the offsets
> topic
> > > originally had 24 hours of retention and that proved problematic.
> > >
> > > In general, some extra detail about the persistence would be greatly
> > > appreciated!
> > >
> > > 4. Batch Acknowledgement
> > >
> > > > "In the situation where some records in a batch have been released or
> > > rejected separately, subsequent fetches of those records are more
> likely to
> > > have gaps."
> > >
> > > Can we expand a bit more on this edge case? I am interested in learning
> > > what gets returned on subsequent fetch requests.
> > > In particular, - how does this work with compression? As far as I
> remember,
> > > we can compress the whole batch there, which might make individual
> record
> > > filtering tricky.
> > >
> > > 5. Member Management
> > >
> > > How is consumer group member management handled? I didn't see any
> specific
> > > mention - is it the same as a vanilla group?
> > > In particular - how will bad consumers be handled?
> > >
> > > I guess I see two cases:
> > > 1. bad consumer that doesn't even heartbeat
> > > 2. bad consumer that heartbeats well but for some reason every message
> > > processing times out. e.g imagine it was network partitioned from some
> > > third-party system that is a critical part of its message processing
> loop
> > >
> > > One evident problem I can foresee in production systems is one (or a
> few)
> > > slow consumer applications bringing the SSO/SEO advancement down to a
> crawl.
> > > Imagine an example where the same consumer app always hits the timeout
> > > limit - what would the behavior be in such a case? Do we keep that
> consumer
> > > app indefinitely (if so, do we run the risk of having it invalidate
> > > completely valid messages)? Are there any equivalents to the consumer
> group
> > > rebalances which fence off such bad consumers?
> > >
> > > 6. Processing Semantics (exactly once)
> > >
> > > > The delivery counts are only maintained approximately and the
> Acquired
> > > state is not persisted.
> > >
> > > Does this introduce the risk of zombie consumers on
> share-partition-leader
> > > failure? i.e restarting and giving another consumer the acquired state
> for
> > > the same record
> > >
> > > I notice that the KIP says:
> > > > Finally, this KIP does not include support for acknowledging delivery
> > > using transactions for exactly-once semantics.
> > > at the very end. It would be helpful to address this earlier in the
> > > example, as one of the key points. And it would be good to be clearer
> on
> > > what the processing semantics are. They seem to be *at-least-once* to
> me.
> > >
> > >
> > > 7. nit: Acronyms
> > >
> > > I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is
> bound
> > > to return weird results.
> > > What do we think about the tradeoff of using more-unique acronyms (like
> > > SGEO, SSGO) at the expense of one extra letter?
> > >
> > > Again - thanks for working on this! I think it's a great initiative.
> I'm
> > > excited to see us perfect this proposal and enable a brand new use
> case in
> > > Kafka!
> > >
> > > Best,
> > > Stanislav
> > >
> > >
> > >
> > > On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <
> andrew_schofield@live.com>
> > > wrote:
> > >
> > > > Hi,
> > > > I would like to start a discussion thread on KIP-932: Queues for
> Kafka.
> > > > This KIP proposes an alternative to consumer groups to enable
> cooperative
> > > > consumption by consumers without partition assignment. You end up
> with
> > > > queue semantics on top of regular Kafka topics, with per-message
> > > > acknowledgement and automatic handling of messages which repeatedly
> fail to
> > > > be processed.
> > > >
> > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> > > >
> > > > Please take a look and let me know what you think.
> > > >
> > > > Thanks.
> > > > Andrew
> > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
>

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Satish Duggana <sa...@gmail.com>.
Minor correction on 103, latest instead of earliest for SPSO default value.

103 It talks about SPSO values, latest being the default and user
can reset it to a target offset timestamp. What is the maximum value
for SPEO? It is good to clarify what could be the maximum value for
SPSO and SPEO. It can be HW or LogStableOffset or some other value?

Thanks,
Satish.

On Mon, 29 May 2023 at 10:06, Satish Duggana <sa...@gmail.com> wrote:
>
> Hi Andrew,
> Thanks for the nice KIP on a very interesting feature about
> introducing some of the traditional MessageQueue semantics to Kafka.
> It is good to see that we are extending the existing consumer groups
> concepts and related mechanisms for shared subscriptions instead of
> bringing any large architectural/protocol changes.
>
> This KIP talks about introducing a durable subscription feature for
> topics with multiple consumers consuming messages parallely from a
> single topic partition.
>
> 101 Are you planning to extend this functionality for queueing
> semantics like JMS point to point style in future?
>
> 102 When a message is rejected by the target consumer, how do users
> know what records/offsets are dropped because of the failed records
> due to rejection ack or due to timeouts etc before DLQs are
> introduced?
>
> 103 It talks about SPSO values, earliest being the default and user
> can reset it to a target offset timestamp. What is the maximum value
> for SPEO? It is good to clarify what could be the maximum value for
> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>
> 104 KIP mentions that "share.delivery.count.limit" as the maximum
> number of delivery attempts for a record delivered to a share group.
> But the actual delivery count may be more than this number as the
> leader may fail updating the delivery count as leader or consumer may
> fail and more delivery attempts may be made later. It may be the
> minimum number of delivery attempts instead of the maximum delivery
> attempts.
>
> Thanks,
> Satish.
>
>
> On Wed, 24 May 2023 at 21:26, Andrew Schofield
> <an...@outlook.com> wrote:
> >
> > Hi Stanislav,
> > Thanks for your email. You bring up some interesting points.
> >
> > 1) Tiered storage
> > I think the situation here for fetching historical data is equivalent to what happens if a user resets the committed offset for a consumer
> > group back to an earlier point in time. So, I will mention this in the next update to the KIP document but I think there's nothing
> > especially different here.
> >
> > 2) SSO initialized to the latest offset
> > The KIP does mention that it is possible for an administrator to set the SSO using either AdminClient.alterShareGroupOffsets or
> > kafka-share-groups.sh. It is entirely intentional that there is no KafkaConsumer config for initializing the SSO. I know that's how it
> > can be done for consumer groups, but it suffers from the situation where different consumers have different opinions about
> > the initial value (earliest vs latest) and then the first one in wins. Also, KIP-842 digs into some problems with how consumer
> > group offset reset works (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) so
> > I've tried to sidestep those problems too.
> >
> > Another possibility is to follow KIP-848 which proposes that AdminClient.incrementalAlterConfigs is enhanced to support a new
> > resource type called GROUP and supporting a dynamic group config in this manner would give a single point of control.
> >
> > 3) Durable storage
> > The KIP does not yet describe how durable storage works. I have a few ideas that I want to flesh out before updating the KIP.
> >
> > I will rule out using a compacted topic though. The problem is that each record on a compacted topic is a key:value pair, and
> > it's not obvious what to use as the key. If it's the share group name, it needs the entire in-flight record state to be recorded in
> > one hit which is extremely inefficient.
> >
> > 4) Batch acknowledgement
> > You are correct that compression makes delivery and acknowledgement of individual messages within a compressed batch
> > more complicated. Again, I will defer a proper answer here until I've dug more deeply.
> >
> > 5) Member management
> > Member management will be similar to consumer groups. I anticipate that it will build entirely on the new consumer group
> > protocol in KIP-848. There seems little benefit in supporting the legacy protocol when this KIP is targeting versions of Kafka
> > which will all have KIP-848.
> >
> > The two cases you mention:
> > i) If a bad consumer doesn't even heartbeat, it will be ejected from the group. This does not involve a rebalance.
> > ii) If a bad consumer heartbeats but always times out message processing, it will slow the advancement of the SSO/SEO. There
> > is the possibility that such a consumer would invalidate completely valid messages. In order to do this, it would have to acquire
> > the same set of message repeatedly, to the exclusion of other consumers, and thus bump the delivery count to the limit.
> > This is unlikely but not impossible.
> >
> > 6) Processing semantics
> > Delivery is at-least-once.
> >
> > 7) Acronyms
> > I hadn't thought about the implications of "Kafka SEO". I think I'll change it to "Share Partition Start Offset" (SPSO) and
> > "Share Partition End Offset" (SPEO).
> >
> > There is a lot of work ahead for this KIP. I intend to work on the protocol changes next.
> >
> > Thanks for getting involved in the discussion.
> > Andrew
> >
> > From: Stanislav Kozlovski <st...@confluent.io.INVALID>
> > Sent: 22 May 2023 11:20
> > To: dev@kafka.apache.org <de...@kafka.apache.org>
> > Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
> >
> > Hey Andrew!
> >
> > Kudos on the proposal. It is greatly written - a joy to read. It is
> > definitely an interesting solution to the queueing problem - I would not
> > have guessed we could solve it like this. Thank you for working on this.
> >
> > Happy to get the discussion started - I have a few comments/questions on
> > first read:
> >
> > 1. Tiered Storage
> >
> > I notice no mention of Tiered Storage (KIP-405). Does that complicate the
> > design, especially when fetching historical data? It would be good to have
> > at least one sentence mentioning it, even if it doesn't impact it. Right
> > now I'm unsure if it was considered.
> >
> > 2. SSO initialized to the latest offset
> >
> > > "By default, the SSO for each share-partition is initialized to the
> > latest offset for the corresponding topic-partitions."
> >
> > Have we considered allowing this to be configurable to latest/earliest?
> > This would be consistent with the auto.offset.reset config of vanilla
> > consumer groups.
> > Thinking from a user's perspective, it sounds valid to want to start from
> > the start of a topic when starting a share group. Historical processing
> > comes to mind
> >
> > 3. Durable Storage
> >
> > The KIP mentions that "The cluster records this information durably", which
> > implies that it saves it somewhere. Does the ShareCoordinator have its own
> > topic? Would it be compacted?
> >
> > In particular, I am interested in what such a topic's retention would be
> > like. The vanilla consumer offsets topic has some special retention
> > semantics (KIP-211) where we start counting the retention after the
> > consumer group becomes empty (inactive) - the default being 7 days. Need to
> > make sure the retention here isn't too short either, as the offsets topic
> > originally had 24 hours of retention and that proved problematic.
> >
> > In general, some extra detail about the persistence would be greatly
> > appreciated!
> >
> > 4. Batch Acknowledgement
> >
> > > "In the situation where some records in a batch have been released or
> > rejected separately, subsequent fetches of those records are more likely to
> > have gaps."
> >
> > Can we expand a bit more on this edge case? I am interested in learning
> > what gets returned on subsequent fetch requests.
> > In particular, - how does this work with compression? As far as I remember,
> > we can compress the whole batch there, which might make individual record
> > filtering tricky.
> >
> > 5. Member Management
> >
> > How is consumer group member management handled? I didn't see any specific
> > mention - is it the same as a vanilla group?
> > In particular - how will bad consumers be handled?
> >
> > I guess I see two cases:
> > 1. bad consumer that doesn't even heartbeat
> > 2. bad consumer that heartbeats well but for some reason every message
> > processing times out. e.g imagine it was network partitioned from some
> > third-party system that is a critical part of its message processing loop
> >
> > One evident problem I can foresee in production systems is one (or a few)
> > slow consumer applications bringing the SSO/SEO advancement down to a crawl.
> > Imagine an example where the same consumer app always hits the timeout
> > limit - what would the behavior be in such a case? Do we keep that consumer
> > app indefinitely (if so, do we run the risk of having it invalidate
> > completely valid messages)? Are there any equivalents to the consumer group
> > rebalances which fence off such bad consumers?
> >
> > 6. Processing Semantics (exactly once)
> >
> > > The delivery counts are only maintained approximately and the Acquired
> > state is not persisted.
> >
> > Does this introduce the risk of zombie consumers on share-partition-leader
> > failure? i.e restarting and giving another consumer the acquired state for
> > the same record
> >
> > I notice that the KIP says:
> > > Finally, this KIP does not include support for acknowledging delivery
> > using transactions for exactly-once semantics.
> > at the very end. It would be helpful to address this earlier in the
> > example, as one of the key points. And it would be good to be clearer on
> > what the processing semantics are. They seem to be *at-least-once* to me.
> >
> >
> > 7. nit: Acronyms
> >
> > I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
> > to return weird results.
> > What do we think about the tradeoff of using more-unique acronyms (like
> > SGEO, SSGO) at the expense of one extra letter?
> >
> > Again - thanks for working on this! I think it's a great initiative. I'm
> > excited to see us perfect this proposal and enable a brand new use case in
> > Kafka!
> >
> > Best,
> > Stanislav
> >
> >
> >
> > On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <an...@live.com>
> > wrote:
> >
> > > Hi,
> > > I would like to start a discussion thread on KIP-932: Queues for Kafka.
> > > This KIP proposes an alternative to consumer groups to enable cooperative
> > > consumption by consumers without partition assignment. You end up with
> > > queue semantics on top of regular Kafka topics, with per-message
> > > acknowledgement and automatic handling of messages which repeatedly fail to
> > > be processed.
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> > >
> > > Please take a look and let me know what you think.
> > >
> > > Thanks.
> > > Andrew
> >
> >
> >
> > --
> > Best,
> > Stanislav

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Satish Duggana <sa...@gmail.com>.
Hi Andrew,
Thanks for the nice KIP on a very interesting feature about
introducing some of the traditional MessageQueue semantics to Kafka.
It is good to see that we are extending the existing consumer groups
concepts and related mechanisms for shared subscriptions instead of
bringing any large architectural/protocol changes.

This KIP talks about introducing a durable subscription feature for
topics with multiple consumers consuming messages parallely from a
single topic partition.

101 Are you planning to extend this functionality for queueing
semantics like JMS point to point style in future?

102 When a message is rejected by the target consumer, how do users
know what records/offsets are dropped because of the failed records
due to rejection ack or due to timeouts etc before DLQs are
introduced?

103 It talks about SPSO values, earliest being the default and user
can reset it to a target offset timestamp. What is the maximum value
for SPEO? It is good to clarify what could be the maximum value for
SPSO and SPEO. It can be HW or LogStableOffset or some other value?

104 KIP mentions that "share.delivery.count.limit" as the maximum
number of delivery attempts for a record delivered to a share group.
But the actual delivery count may be more than this number as the
leader may fail updating the delivery count as leader or consumer may
fail and more delivery attempts may be made later. It may be the
minimum number of delivery attempts instead of the maximum delivery
attempts.

Thanks,
Satish.


On Wed, 24 May 2023 at 21:26, Andrew Schofield
<an...@outlook.com> wrote:
>
> Hi Stanislav,
> Thanks for your email. You bring up some interesting points.
>
> 1) Tiered storage
> I think the situation here for fetching historical data is equivalent to what happens if a user resets the committed offset for a consumer
> group back to an earlier point in time. So, I will mention this in the next update to the KIP document but I think there's nothing
> especially different here.
>
> 2) SSO initialized to the latest offset
> The KIP does mention that it is possible for an administrator to set the SSO using either AdminClient.alterShareGroupOffsets or
> kafka-share-groups.sh. It is entirely intentional that there is no KafkaConsumer config for initializing the SSO. I know that's how it
> can be done for consumer groups, but it suffers from the situation where different consumers have different opinions about
> the initial value (earliest vs latest) and then the first one in wins. Also, KIP-842 digs into some problems with how consumer
> group offset reset works (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) so
> I've tried to sidestep those problems too.
>
> Another possibility is to follow KIP-848 which proposes that AdminClient.incrementalAlterConfigs is enhanced to support a new
> resource type called GROUP and supporting a dynamic group config in this manner would give a single point of control.
>
> 3) Durable storage
> The KIP does not yet describe how durable storage works. I have a few ideas that I want to flesh out before updating the KIP.
>
> I will rule out using a compacted topic though. The problem is that each record on a compacted topic is a key:value pair, and
> it's not obvious what to use as the key. If it's the share group name, it needs the entire in-flight record state to be recorded in
> one hit which is extremely inefficient.
>
> 4) Batch acknowledgement
> You are correct that compression makes delivery and acknowledgement of individual messages within a compressed batch
> more complicated. Again, I will defer a proper answer here until I've dug more deeply.
>
> 5) Member management
> Member management will be similar to consumer groups. I anticipate that it will build entirely on the new consumer group
> protocol in KIP-848. There seems little benefit in supporting the legacy protocol when this KIP is targeting versions of Kafka
> which will all have KIP-848.
>
> The two cases you mention:
> i) If a bad consumer doesn't even heartbeat, it will be ejected from the group. This does not involve a rebalance.
> ii) If a bad consumer heartbeats but always times out message processing, it will slow the advancement of the SSO/SEO. There
> is the possibility that such a consumer would invalidate completely valid messages. In order to do this, it would have to acquire
> the same set of message repeatedly, to the exclusion of other consumers, and thus bump the delivery count to the limit.
> This is unlikely but not impossible.
>
> 6) Processing semantics
> Delivery is at-least-once.
>
> 7) Acronyms
> I hadn't thought about the implications of "Kafka SEO". I think I'll change it to "Share Partition Start Offset" (SPSO) and
> "Share Partition End Offset" (SPEO).
>
> There is a lot of work ahead for this KIP. I intend to work on the protocol changes next.
>
> Thanks for getting involved in the discussion.
> Andrew
>
> From: Stanislav Kozlovski <st...@confluent.io.INVALID>
> Sent: 22 May 2023 11:20
> To: dev@kafka.apache.org <de...@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
>
> Hey Andrew!
>
> Kudos on the proposal. It is greatly written - a joy to read. It is
> definitely an interesting solution to the queueing problem - I would not
> have guessed we could solve it like this. Thank you for working on this.
>
> Happy to get the discussion started - I have a few comments/questions on
> first read:
>
> 1. Tiered Storage
>
> I notice no mention of Tiered Storage (KIP-405). Does that complicate the
> design, especially when fetching historical data? It would be good to have
> at least one sentence mentioning it, even if it doesn't impact it. Right
> now I'm unsure if it was considered.
>
> 2. SSO initialized to the latest offset
>
> > "By default, the SSO for each share-partition is initialized to the
> latest offset for the corresponding topic-partitions."
>
> Have we considered allowing this to be configurable to latest/earliest?
> This would be consistent with the auto.offset.reset config of vanilla
> consumer groups.
> Thinking from a user's perspective, it sounds valid to want to start from
> the start of a topic when starting a share group. Historical processing
> comes to mind
>
> 3. Durable Storage
>
> The KIP mentions that "The cluster records this information durably", which
> implies that it saves it somewhere. Does the ShareCoordinator have its own
> topic? Would it be compacted?
>
> In particular, I am interested in what such a topic's retention would be
> like. The vanilla consumer offsets topic has some special retention
> semantics (KIP-211) where we start counting the retention after the
> consumer group becomes empty (inactive) - the default being 7 days. Need to
> make sure the retention here isn't too short either, as the offsets topic
> originally had 24 hours of retention and that proved problematic.
>
> In general, some extra detail about the persistence would be greatly
> appreciated!
>
> 4. Batch Acknowledgement
>
> > "In the situation where some records in a batch have been released or
> rejected separately, subsequent fetches of those records are more likely to
> have gaps."
>
> Can we expand a bit more on this edge case? I am interested in learning
> what gets returned on subsequent fetch requests.
> In particular, - how does this work with compression? As far as I remember,
> we can compress the whole batch there, which might make individual record
> filtering tricky.
>
> 5. Member Management
>
> How is consumer group member management handled? I didn't see any specific
> mention - is it the same as a vanilla group?
> In particular - how will bad consumers be handled?
>
> I guess I see two cases:
> 1. bad consumer that doesn't even heartbeat
> 2. bad consumer that heartbeats well but for some reason every message
> processing times out. e.g imagine it was network partitioned from some
> third-party system that is a critical part of its message processing loop
>
> One evident problem I can foresee in production systems is one (or a few)
> slow consumer applications bringing the SSO/SEO advancement down to a crawl.
> Imagine an example where the same consumer app always hits the timeout
> limit - what would the behavior be in such a case? Do we keep that consumer
> app indefinitely (if so, do we run the risk of having it invalidate
> completely valid messages)? Are there any equivalents to the consumer group
> rebalances which fence off such bad consumers?
>
> 6. Processing Semantics (exactly once)
>
> > The delivery counts are only maintained approximately and the Acquired
> state is not persisted.
>
> Does this introduce the risk of zombie consumers on share-partition-leader
> failure? i.e restarting and giving another consumer the acquired state for
> the same record
>
> I notice that the KIP says:
> > Finally, this KIP does not include support for acknowledging delivery
> using transactions for exactly-once semantics.
> at the very end. It would be helpful to address this earlier in the
> example, as one of the key points. And it would be good to be clearer on
> what the processing semantics are. They seem to be *at-least-once* to me.
>
>
> 7. nit: Acronyms
>
> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
> to return weird results.
> What do we think about the tradeoff of using more-unique acronyms (like
> SGEO, SSGO) at the expense of one extra letter?
>
> Again - thanks for working on this! I think it's a great initiative. I'm
> excited to see us perfect this proposal and enable a brand new use case in
> Kafka!
>
> Best,
> Stanislav
>
>
>
> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <an...@live.com>
> wrote:
>
> > Hi,
> > I would like to start a discussion thread on KIP-932: Queues for Kafka.
> > This KIP proposes an alternative to consumer groups to enable cooperative
> > consumption by consumers without partition assignment. You end up with
> > queue semantics on top of regular Kafka topics, with per-message
> > acknowledgement and automatic handling of messages which repeatedly fail to
> > be processed.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
> >
> > Please take a look and let me know what you think.
> >
> > Thanks.
> > Andrew
>
>
>
> --
> Best,
> Stanislav

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Andrew Schofield <an...@outlook.com>.
Hi Stanislav,
Thanks for your email. You bring up some interesting points.

1) Tiered storage
I think the situation here for fetching historical data is equivalent to what happens if a user resets the committed offset for a consumer
group back to an earlier point in time. So, I will mention this in the next update to the KIP document but I think there's nothing
especially different here.

2) SSO initialized to the latest offset
The KIP does mention that it is possible for an administrator to set the SSO using either AdminClient.alterShareGroupOffsets or
kafka-share-groups.sh. It is entirely intentional that there is no KafkaConsumer config for initializing the SSO. I know that's how it
can be done for consumer groups, but it suffers from the situation where different consumers have different opinions about
the initial value (earliest vs latest) and then the first one in wins. Also, KIP-842 digs into some problems with how consumer
group offset reset works (https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) so
I've tried to sidestep those problems too.

Another possibility is to follow KIP-848 which proposes that AdminClient.incrementalAlterConfigs is enhanced to support a new
resource type called GROUP and supporting a dynamic group config in this manner would give a single point of control.

3) Durable storage
The KIP does not yet describe how durable storage works. I have a few ideas that I want to flesh out before updating the KIP.

I will rule out using a compacted topic though. The problem is that each record on a compacted topic is a key:value pair, and
it's not obvious what to use as the key. If it's the share group name, it needs the entire in-flight record state to be recorded in
one hit which is extremely inefficient.

4) Batch acknowledgement
You are correct that compression makes delivery and acknowledgement of individual messages within a compressed batch
more complicated. Again, I will defer a proper answer here until I've dug more deeply.

5) Member management
Member management will be similar to consumer groups. I anticipate that it will build entirely on the new consumer group
protocol in KIP-848. There seems little benefit in supporting the legacy protocol when this KIP is targeting versions of Kafka
which will all have KIP-848.

The two cases you mention:
i) If a bad consumer doesn't even heartbeat, it will be ejected from the group. This does not involve a rebalance.
ii) If a bad consumer heartbeats but always times out message processing, it will slow the advancement of the SSO/SEO. There
is the possibility that such a consumer would invalidate completely valid messages. In order to do this, it would have to acquire
the same set of message repeatedly, to the exclusion of other consumers, and thus bump the delivery count to the limit.
This is unlikely but not impossible.

6) Processing semantics
Delivery is at-least-once.

7) Acronyms
I hadn't thought about the implications of "Kafka SEO". I think I'll change it to "Share Partition Start Offset" (SPSO) and
"Share Partition End Offset" (SPEO).

There is a lot of work ahead for this KIP. I intend to work on the protocol changes next.

Thanks for getting involved in the discussion.
Andrew

From: Stanislav Kozlovski <st...@confluent.io.INVALID>
Sent: 22 May 2023 11:20
To: dev@kafka.apache.org <de...@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-932: Queues for Kafka

Hey Andrew!

Kudos on the proposal. It is greatly written - a joy to read. It is
definitely an interesting solution to the queueing problem - I would not
have guessed we could solve it like this. Thank you for working on this.

Happy to get the discussion started - I have a few comments/questions on
first read:

1. Tiered Storage

I notice no mention of Tiered Storage (KIP-405). Does that complicate the
design, especially when fetching historical data? It would be good to have
at least one sentence mentioning it, even if it doesn't impact it. Right
now I'm unsure if it was considered.

2. SSO initialized to the latest offset

> "By default, the SSO for each share-partition is initialized to the
latest offset for the corresponding topic-partitions."

Have we considered allowing this to be configurable to latest/earliest?
This would be consistent with the auto.offset.reset config of vanilla
consumer groups.
Thinking from a user's perspective, it sounds valid to want to start from
the start of a topic when starting a share group. Historical processing
comes to mind

3. Durable Storage

The KIP mentions that "The cluster records this information durably", which
implies that it saves it somewhere. Does the ShareCoordinator have its own
topic? Would it be compacted?

In particular, I am interested in what such a topic's retention would be
like. The vanilla consumer offsets topic has some special retention
semantics (KIP-211) where we start counting the retention after the
consumer group becomes empty (inactive) - the default being 7 days. Need to
make sure the retention here isn't too short either, as the offsets topic
originally had 24 hours of retention and that proved problematic.

In general, some extra detail about the persistence would be greatly
appreciated!

4. Batch Acknowledgement

> "In the situation where some records in a batch have been released or
rejected separately, subsequent fetches of those records are more likely to
have gaps."

Can we expand a bit more on this edge case? I am interested in learning
what gets returned on subsequent fetch requests.
In particular, - how does this work with compression? As far as I remember,
we can compress the whole batch there, which might make individual record
filtering tricky.

5. Member Management

How is consumer group member management handled? I didn't see any specific
mention - is it the same as a vanilla group?
In particular - how will bad consumers be handled?

I guess I see two cases:
1. bad consumer that doesn't even heartbeat
2. bad consumer that heartbeats well but for some reason every message
processing times out. e.g imagine it was network partitioned from some
third-party system that is a critical part of its message processing loop

One evident problem I can foresee in production systems is one (or a few)
slow consumer applications bringing the SSO/SEO advancement down to a crawl.
Imagine an example where the same consumer app always hits the timeout
limit - what would the behavior be in such a case? Do we keep that consumer
app indefinitely (if so, do we run the risk of having it invalidate
completely valid messages)? Are there any equivalents to the consumer group
rebalances which fence off such bad consumers?

6. Processing Semantics (exactly once)

> The delivery counts are only maintained approximately and the Acquired
state is not persisted.

Does this introduce the risk of zombie consumers on share-partition-leader
failure? i.e restarting and giving another consumer the acquired state for
the same record

I notice that the KIP says:
> Finally, this KIP does not include support for acknowledging delivery
using transactions for exactly-once semantics.
at the very end. It would be helpful to address this earlier in the
example, as one of the key points. And it would be good to be clearer on
what the processing semantics are. They seem to be *at-least-once* to me.


7. nit: Acronyms

I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
to return weird results.
What do we think about the tradeoff of using more-unique acronyms (like
SGEO, SSGO) at the expense of one extra letter?

Again - thanks for working on this! I think it's a great initiative. I'm
excited to see us perfect this proposal and enable a brand new use case in
Kafka!

Best,
Stanislav



On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <an...@live.com>
wrote:

> Hi,
> I would like to start a discussion thread on KIP-932: Queues for Kafka.
> This KIP proposes an alternative to consumer groups to enable cooperative
> consumption by consumers without partition assignment. You end up with
> queue semantics on top of regular Kafka topics, with per-message
> acknowledgement and automatic handling of messages which repeatedly fail to
> be processed.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>
> Please take a look and let me know what you think.
>
> Thanks.
> Andrew



--
Best,
Stanislav

Re: [DISCUSS] KIP-932: Queues for Kafka

Posted by Stanislav Kozlovski <st...@confluent.io.INVALID>.
Hey Andrew!

Kudos on the proposal. It is greatly written - a joy to read. It is
definitely an interesting solution to the queueing problem - I would not
have guessed we could solve it like this. Thank you for working on this.

Happy to get the discussion started - I have a few comments/questions on
first read:

1. Tiered Storage

I notice no mention of Tiered Storage (KIP-405). Does that complicate the
design, especially when fetching historical data? It would be good to have
at least one sentence mentioning it, even if it doesn't impact it. Right
now I'm unsure if it was considered.

2. SSO initialized to the latest offset

> "By default, the SSO for each share-partition is initialized to the
latest offset for the corresponding topic-partitions."

Have we considered allowing this to be configurable to latest/earliest?
This would be consistent with the auto.offset.reset config of vanilla
consumer groups.
Thinking from a user's perspective, it sounds valid to want to start from
the start of a topic when starting a share group. Historical processing
comes to mind

3. Durable Storage

The KIP mentions that "The cluster records this information durably", which
implies that it saves it somewhere. Does the ShareCoordinator have its own
topic? Would it be compacted?

In particular, I am interested in what such a topic's retention would be
like. The vanilla consumer offsets topic has some special retention
semantics (KIP-211) where we start counting the retention after the
consumer group becomes empty (inactive) - the default being 7 days. Need to
make sure the retention here isn't too short either, as the offsets topic
originally had 24 hours of retention and that proved problematic.

In general, some extra detail about the persistence would be greatly
appreciated!

4. Batch Acknowledgement

> "In the situation where some records in a batch have been released or
rejected separately, subsequent fetches of those records are more likely to
have gaps."

Can we expand a bit more on this edge case? I am interested in learning
what gets returned on subsequent fetch requests.
In particular, - how does this work with compression? As far as I remember,
we can compress the whole batch there, which might make individual record
filtering tricky.

5. Member Management

How is consumer group member management handled? I didn't see any specific
mention - is it the same as a vanilla group?
In particular - how will bad consumers be handled?

I guess I see two cases:
1. bad consumer that doesn't even heartbeat
2. bad consumer that heartbeats well but for some reason every message
processing times out. e.g imagine it was network partitioned from some
third-party system that is a critical part of its message processing loop

One evident problem I can foresee in production systems is one (or a few)
slow consumer applications bringing the SSO/SEO advancement down to a crawl.
Imagine an example where the same consumer app always hits the timeout
limit - what would the behavior be in such a case? Do we keep that consumer
app indefinitely (if so, do we run the risk of having it invalidate
completely valid messages)? Are there any equivalents to the consumer group
rebalances which fence off such bad consumers?

6. Processing Semantics (exactly once)

> The delivery counts are only maintained approximately and the Acquired
state is not persisted.

Does this introduce the risk of zombie consumers on share-partition-leader
failure? i.e restarting and giving another consumer the acquired state for
the same record

I notice that the KIP says:
> Finally, this KIP does not include support for acknowledging delivery
using transactions for exactly-once semantics.
at the very end. It would be helpful to address this earlier in the
example, as one of the key points. And it would be good to be clearer on
what the processing semantics are. They seem to be *at-least-once* to me.


7. nit: Acronyms

I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is bound
to return weird results.
What do we think about the tradeoff of using more-unique acronyms (like
SGEO, SSGO) at the expense of one extra letter?

Again - thanks for working on this! I think it's a great initiative. I'm
excited to see us perfect this proposal and enable a brand new use case in
Kafka!

Best,
Stanislav



On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <an...@live.com>
wrote:

> Hi,
> I would like to start a discussion thread on KIP-932: Queues for Kafka.
> This KIP proposes an alternative to consumer groups to enable cooperative
> consumption by consumers without partition assignment. You end up with
> queue semantics on top of regular Kafka topics, with per-message
> acknowledgement and automatic handling of messages which repeatedly fail to
> be processed.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>
> Please take a look and let me know what you think.
>
> Thanks.
> Andrew



-- 
Best,
Stanislav