You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Vahid S Hashemian <va...@us.ibm.com> on 2018/01/12 18:21:35 UTC

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

There has been no further discussion on this KIP for about two months.
So I thought I'd provide the scoop hoping it would spark additional 
feedback and move the KIP forward.

The KIP proposes a method to preserve group offsets as long as the group 
is not in Empty state (even when offsets are committed very rarely), and 
start the offset expiration of the group as soon as the group becomes 
Empty.
It suggests dropping the `retention_time` field from the `OffsetCommit` 
request and, instead, enforcing it via the broker config 
`offsets.retention.minutes` for all groups. In other words, all groups 
will have the same retention time.
The KIP presumes that this global retention config would suffice common 
use cases and does not lead to, e.g., unmanageable offset cache size (for 
groups that don't need to stay around that long). It suggests opening 
another KIP if this global retention setting proves to be problematic in 
the future. It was suggested earlier in the discussion thread that the KIP 
should propose a per-group retention config to circumvent this risk.

I look forward to hearing your thoughts. Thanks!

--Vahid




From:   "Vahid S Hashemian" <va...@us.ibm.com>
To:     dev <de...@kafka.apache.org>
Date:   10/18/2017 04:45 PM
Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer 
Group Offsets



Hi all,

I created a KIP to address the group offset expiration issue reported in 
KAFKA-4682:
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=


Your feedback is welcome!

Thanks.
--Vahid






Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Correction.

The offsets will be removed when the expiration timestamp is reached.
We will be setting the expiration timestamp in a smart manner to cover 
both the issue reported in the JIRA, and also the case of unsubscribed 
topics.

I'll detail this in the KIP, and send a notification when the update is 
ready for review.

Apologies for the confusion.
--Vahid




From:   "Vahid S Hashemian" <va...@us.ibm.com>
To:     dev@kafka.apache.org
Date:   03/01/2018 11:43 AM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hi Jason,

Thanks for your feedback.

If we want to keep the per-partition expiration timestamp, then the scope 
of the KIP will be significantly reduced. In other words, we will just be 
deleting offsets from cache as before but only if the group is in Empty 
state.
This would not cause any negative backward compatibility concern, since 
offsets won't expire any earlier than before. They may be removed at the 
same time (if group is already Empty) or later (if group is not Empty yet) 

than before.
If I'm not mistaken we no longer need to change the internal schema either 

- no need for keeping track of when group becomes Empty.
Would this make sense? If so, I'll update the KIP with this reduced-scope 
proposal.

Regarding manual offset removal I was thinking of synching up the new 
consumer group command with the old command in which a '--topic' option 
was supported with '--delete'.

Regarding the other JIRA you referred to, sure, I'll add that in the KIP.

Thanks.
--Vahid



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   02/28/2018 12:10 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Thanks for the response. Replies below:


> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.


Mainly I wanted to ensure that we would be holding offsets at least as 
long
as what was requested by older clients. If we hold it for longer, that's
probably fine, but there may be application behavior which would break if
offsets are expired earlier than expected.

2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp 
anymore.
> Is there a particular reason for your suggestion of storing the 
timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?


Although it is not exposed to clients, we still have to manage
compatibility of the schema across versions, so I think we should include
it in the KIP. The reason I was thinking of using the time that the group
became Empty is that the configured timeout might change. I think my
expectation as a user would be that a timeout change would also apply to
existing groups, but I'm not sure if there are any reasons not to so.

3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually 
removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?


I think manual removal of offsets for this case is a bit of a tough sell
for usability. Did you imagine it happening automatically in the consumer
through an API?

I'm finding it increasingly frustrating that the generic group coordinator
is limited in its decision making since it cannot see the subscription
metadata. It is the same problem in Dong's KIP. I think I would suggest
that, at a minimum, we leave the door open to enforcing offset expiration
either 1) when the group becomes empty, and 2) when the corresponding
partition is removed from the subscription. Perhaps that means we need to
keep the individual offset expiration timestamp after all. Actually we
would probably need it anyway to handle "simple" consumer groups which are
always Empty.

One additional note: I have seen recently a case where the offset cache
caused an OOM on the broker. I looked into it and found that most of the
cache was used for storing console consumer offsets. I know you had a 
patch
before which turned off auto-commit when the groupId was generated by
ConsoleConsumer. Maybe we could lump that change into this KIP?

Thanks,
Jason




On Fri, Feb 23, 2018 at 4:08 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi Jason,
>
> Thanks a lot for reviewing the KIP.
>
> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.
>
> 2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp 
anymore.
> Is there a particular reason for your suggestion of storing the 
timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?
>
> 3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually 
removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?
>
> I'll wait for your feedback and clarification on the above items before
> updating the KIP.
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change 
configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage 
format?
> I
> think we need to add a new field to indicate the time that the group 
state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic 
to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is 
and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics 
that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a 
topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other 
way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of 
the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed 
we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wu...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group 
is
> > empty and then (after the right time has passed) expire the entire 
group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for 
topics
> A
> > through Y will stay around as long as this mirroring group name 
exists.
> >
> > In the current implementation that already exists (prior to this KIP), 

I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> 
wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very 
rarely),
> > and
> > >> start the offset expiration of the group as soon as the group 
becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache 
size
> > (for
> > >> groups that don't need to stay around that long). It suggests 
opening
> > >> another KIP if this global retention setting proves to be 
problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this 
risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > >> To:     dev <de...@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue 
reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>









Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks for your feedback.

If we want to keep the per-partition expiration timestamp, then the scope 
of the KIP will be significantly reduced. In other words, we will just be 
deleting offsets from cache as before but only if the group is in Empty 
state.
This would not cause any negative backward compatibility concern, since 
offsets won't expire any earlier than before. They may be removed at the 
same time (if group is already Empty) or later (if group is not Empty yet) 
than before.
If I'm not mistaken we no longer need to change the internal schema either 
- no need for keeping track of when group becomes Empty.
Would this make sense? If so, I'll update the KIP with this reduced-scope 
proposal.

Regarding manual offset removal I was thinking of synching up the new 
consumer group command with the old command in which a '--topic' option 
was supported with '--delete'.

Regarding the other JIRA you referred to, sure, I'll add that in the KIP.

Thanks.
--Vahid



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   02/28/2018 12:10 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Thanks for the response. Replies below:


> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.


Mainly I wanted to ensure that we would be holding offsets at least as 
long
as what was requested by older clients. If we hold it for longer, that's
probably fine, but there may be application behavior which would break if
offsets are expired earlier than expected.

2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp 
anymore.
> Is there a particular reason for your suggestion of storing the 
timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?


Although it is not exposed to clients, we still have to manage
compatibility of the schema across versions, so I think we should include
it in the KIP. The reason I was thinking of using the time that the group
became Empty is that the configured timeout might change. I think my
expectation as a user would be that a timeout change would also apply to
existing groups, but I'm not sure if there are any reasons not to so.

3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually 
removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?


I think manual removal of offsets for this case is a bit of a tough sell
for usability. Did you imagine it happening automatically in the consumer
through an API?

I'm finding it increasingly frustrating that the generic group coordinator
is limited in its decision making since it cannot see the subscription
metadata. It is the same problem in Dong's KIP. I think I would suggest
that, at a minimum, we leave the door open to enforcing offset expiration
either 1) when the group becomes empty, and 2) when the corresponding
partition is removed from the subscription. Perhaps that means we need to
keep the individual offset expiration timestamp after all. Actually we
would probably need it anyway to handle "simple" consumer groups which are
always Empty.

One additional note: I have seen recently a case where the offset cache
caused an OOM on the broker. I looked into it and found that most of the
cache was used for storing console consumer offsets. I know you had a 
patch
before which turned off auto-commit when the groupId was generated by
ConsoleConsumer. Maybe we could lump that change into this KIP?

Thanks,
Jason




On Fri, Feb 23, 2018 at 4:08 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi Jason,
>
> Thanks a lot for reviewing the KIP.
>
> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.
>
> 2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp 
anymore.
> Is there a particular reason for your suggestion of storing the 
timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?
>
> 3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually 
removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?
>
> I'll wait for your feedback and clarification on the above items before
> updating the KIP.
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change 
configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage 
format?
> I
> think we need to add a new field to indicate the time that the group 
state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic 
to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is 
and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics 
that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a 
topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other 
way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of 
the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed 
we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wu...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group 
is
> > empty and then (after the right time has passed) expire the entire 
group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for 
topics
> A
> > through Y will stay around as long as this mirroring group name 
exists.
> >
> > In the current implementation that already exists (prior to this KIP), 
I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> 
wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very 
rarely),
> > and
> > >> start the offset expiration of the group as soon as the group 
becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache 
size
> > (for
> > >> groups that don't need to stay around that long). It suggests 
opening
> > >> another KIP if this global retention setting proves to be 
problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this 
risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > >> To:     dev <de...@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue 
reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>





Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

Thanks for the response. Replies below:


> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.


Mainly I wanted to ensure that we would be holding offsets at least as long
as what was requested by older clients. If we hold it for longer, that's
probably fine, but there may be application behavior which would break if
offsets are expired earlier than expected.

2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp anymore.
> Is there a particular reason for your suggestion of storing the timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?


Although it is not exposed to clients, we still have to manage
compatibility of the schema across versions, so I think we should include
it in the KIP. The reason I was thinking of using the time that the group
became Empty is that the configured timeout might change. I think my
expectation as a user would be that a timeout change would also apply to
existing groups, but I'm not sure if there are any reasons not to so.

3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?


I think manual removal of offsets for this case is a bit of a tough sell
for usability. Did you imagine it happening automatically in the consumer
through an API?

I'm finding it increasingly frustrating that the generic group coordinator
is limited in its decision making since it cannot see the subscription
metadata. It is the same problem in Dong's KIP. I think I would suggest
that, at a minimum, we leave the door open to enforcing offset expiration
either 1) when the group becomes empty, and 2) when the corresponding
partition is removed from the subscription. Perhaps that means we need to
keep the individual offset expiration timestamp after all. Actually we
would probably need it anyway to handle "simple" consumer groups which are
always Empty.

One additional note: I have seen recently a case where the offset cache
caused an OOM on the broker. I looked into it and found that most of the
cache was used for storing console consumer offsets. I know you had a patch
before which turned off auto-commit when the groupId was generated by
ConsoleConsumer. Maybe we could lump that change into this KIP?

Thanks,
Jason




On Fri, Feb 23, 2018 at 4:08 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Hi Jason,
>
> Thanks a lot for reviewing the KIP.
>
> 1. I think my suggestion in the KIP was more towards ignoring the client
> provided values and use a large enough broker config value instead. It
> seems the question comes down to whether we still want to honor the
> `retention_time` field in the old requests. With the new request (as per
> this KIP) the client would not be able to overwrite the broker retention
> config. Your suggestion provides kind of a back door for the overwrite.
> Also, since different offset commits associated with a group can
> potentially use different `retention_time` values, it's probably
> reasonable to use the maximum of all those values (including the broker
> config) as the group offset retention.
>
> 2. If I'm not mistake you are referring to potential changes in
> `GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation
> matter and frankly, have not fully thought about it, but I agree that it
> needs to be updated to include either the timestamp the group becomes
> `Empty` or maybe the expiration timestamp of the group. And perhaps, we
> would not need to store per partition offset expiration timestamp anymore.
> Is there a particular reason for your suggestion of storing the timestamp
> the group becomes `Empty`, vs the expiration timestamp of the group?
>
> 3. To limit the scope of the KIP I would prefer to handle this matter
> separately if it doesn't have to be addressed as part of this change. It
> probably needs be addressed at some point and I'll mention it in the KIP
> so we have it documented. Do you think my suggestion of manually removing
> topic offsets from group (as an interim solution) is worth additional
> discussion / implementation?
>
> I'll wait for your feedback and clarification on the above items before
> updating the KIP.
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage format?
> I
> think we need to add a new field to indicate the time that the group state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wu...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group is
> > empty and then (after the right time has passed) expire the entire group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for topics
> A
> > through Y will stay around as long as this mirroring group name exists.
> >
> > In the current implementation that already exists (prior to this KIP), I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very rarely),
> > and
> > >> start the offset expiration of the group as soon as the group becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache size
> > (for
> > >> groups that don't need to stay around that long). It suggests opening
> > >> another KIP if this global retention setting proves to be problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > >> To:     dev <de...@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks a lot for reviewing the KIP.

1. I think my suggestion in the KIP was more towards ignoring the client 
provided values and use a large enough broker config value instead. It 
seems the question comes down to whether we still want to honor the 
`retention_time` field in the old requests. With the new request (as per 
this KIP) the client would not be able to overwrite the broker retention 
config. Your suggestion provides kind of a back door for the overwrite. 
Also, since different offset commits associated with a group can 
potentially use different `retention_time` values, it's probably 
reasonable to use the maximum of all those values (including the broker 
config) as the group offset retention.

2. If I'm not mistake you are referring to potential changes in 
`GROUP_METADATA_VALUE_SCHEMA`. I saw this as an internal implementation 
matter and frankly, have not fully thought about it, but I agree that it 
needs to be updated to include either the timestamp the group becomes 
`Empty` or maybe the expiration timestamp of the group. And perhaps, we 
would not need to store per partition offset expiration timestamp anymore. 
Is there a particular reason for your suggestion of storing the timestamp 
the group becomes `Empty`, vs the expiration timestamp of the group?

3. To limit the scope of the KIP I would prefer to handle this matter 
separately if it doesn't have to be addressed as part of this change. It 
probably needs be addressed at some point and I'll mention it in the KIP 
so we have it documented. Do you think my suggestion of manually removing 
topic offsets from group (as an interim solution) is worth additional 
discussion / implementation?

I'll wait for your feedback and clarification on the above items before 
updating the KIP.

Thanks.
--Vahid



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   02/18/2018 01:16 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Sorry for the late response. The KIP looks good. A few comments:

1. I'm not quite sure I understand how you are handling old clients. It
sounds like you are saying that old clients need to change configuration?
I'd suggest 1) if an old client requests the default expiration, then we
use the updated behavior, and 2) if the old client requests a specific
expiration, we enforce it from the time the group becomes Empty.

2. Does this require a new version of the group metadata messsage format? 
I
think we need to add a new field to indicate the time that the group state
changed to Empty. This will allow us to resume the expiration timer
correctly after a coordinator change. Alternatively, we could reset the
expiration timeout after every coordinator move, but it would be nice to
have a definite bound on offset expiration.

3. The question about removal of offsets for partitions which are no 
longer
in use is interesting. At the moment, it's difficult for the coordinator 
to
know that a partition is no longer being fetched because it is agnostic to
subscription state (the group coordinator is used for more than just
consumer groups). Even if we allow the coordinator to read subscription
state to tell which topics are no longer being consumed, we might need 
some
additional bookkeeping to keep track of /when/ the consumer stopped
subscribing to a particular topic. Or maybe we can reset this expiration
timer after every coordinator change when the new coordinator reads the
offsets and group metadata? I am not sure how common this use case is and
whether it needs to be solved as part of this KIP.

Thanks,
Jason



On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Thanks James for sharing that scenario.
>
> I agree it makes sense to be able to remove offsets for the topics that
> are no longer "active" in the group.
> I think it becomes important to determine what constitutes that a topic 
is
> no longer active: If we use per-partition expiration we would manually
> choose a retention time that works for the particular scenario.
>
> That works, but since we are manually intervening and specify a
> per-partition retention, why not do the intervention in some other way:
>
> One alternative for this intervention, to favor the simplicity of the
> suggested protocol in the KIP, is to improve upon the just introduced
> DELETE_GROUPS API and allow for deletion of offsets of specific topics 
in
> the group. This is what the old ZooKeeper based group management 
supported
> anyway, and we would just be leveling the group deletion features of the
> Kafka-based group management with the ZooKeeper-based one.
>
> So, instead of deciding in advance when the offsets should be removed we
> would instantly remove them when we are sure that they are no longer
> needed.
>
> Let me know what you think.
>
> Thanks.
> --Vahid
>
>
>
> From:   James Cheng <wu...@gmail.com>
> To:     dev@kafka.apache.org
> Date:   02/01/2018 12:37 AM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Vahid,
>
> Under rejected alternatives, we had decided that we did NOT want to do
> per-partition expiration, and instead we wait until the entire group is
> empty and then (after the right time has passed) expire the entire group
> at once.
>
> I thought of one scenario that might benefit from per-partition
> expiration.
>
> Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> single partition, so 26 partitions. Let's say I have mirrormaker 
mirroring
> those 26 topics. The group will then have 26 committed offsets.
>
> Let's say I then change the whitelist on mirrormaker so that it only
> mirrors topic Z, but I keep the same consumer group name. (I imagine 
that
> is a common thing to do?)
>
> With the proposed design for this KIP, the committed offsets for topics 
A
> through Y will stay around as long as this mirroring group name exists.
>
> In the current implementation that already exists (prior to this KIP), I
> belive that committed offsets for topics A through Y will expire.
>
> How much do we care about this case?
>
> -James
>
> > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> >
> > Bumping this as I'd like to see it land...
> >
> > It's one of the "features" that tends to catch Kafka n00bs unawares 
and
> > typically results in message skippage/loss, vs the proposed solution 
is
> > much more intuitive behavior.
> >
> > Plus it's more wire efficient because consumers no longer need to 
commit
> > offsets for partitions that have no new messages just to keep those
> offsets
> > alive.
> >
> > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> >> There has been no further discussion on this KIP for about two 
months.
> >> So I thought I'd provide the scoop hoping it would spark additional
> >> feedback and move the KIP forward.
> >>
> >> The KIP proposes a method to preserve group offsets as long as the
> group
> >> is not in Empty state (even when offsets are committed very rarely),
> and
> >> start the offset expiration of the group as soon as the group becomes
> >> Empty.
> >> It suggests dropping the `retention_time` field from the 
`OffsetCommit`
> >> request and, instead, enforcing it via the broker config
> >> `offsets.retention.minutes` for all groups. In other words, all 
groups
> >> will have the same retention time.
> >> The KIP presumes that this global retention config would suffice 
common
> >> use cases and does not lead to, e.g., unmanageable offset cache size
> (for
> >> groups that don't need to stay around that long). It suggests opening
> >> another KIP if this global retention setting proves to be problematic
> in
> >> the future. It was suggested earlier in the discussion thread that 
the
> KIP
> >> should propose a per-group retention config to circumvent this risk.
> >>
> >> I look forward to hearing your thoughts. Thanks!
> >>
> >> --Vahid
> >>
> >>
> >>
> >>
> >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> >> To:     dev <de...@kafka.apache.org>
> >> Date:   10/18/2017 04:45 PM
> >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer
> >> Group Offsets
> >>
> >>
> >>
> >> Hi all,
> >>
> >> I created a KIP to address the group offset expiration issue reported
> in
> >> KAFKA-4682:
> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> >>
> >>
> >> Your feedback is welcome!
> >>
> >> Thanks.
> >> --Vahid
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> > --
> >
> > *Jeff Widman*
> > jeffwidman.com <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> 9xkoI&e=
> > | 740-WIDMAN-J (943-6265)
> > <><
>
>
>
>
>
>





Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Matthias,

Thanks a lot for reviewing the KIP and the clarification on streams 
protocol type for consumer groups.
I have updated the KIP with your suggestion.
I'm assuming the cast vote so far will remain valid since this is a minor 
change.

Cheers,
--Vahid




From:   "Matthias J. Sax" <ma...@confluent.io>
To:     dev@kafka.apache.org
Date:   04/04/2018 06:28 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



I was just reading the whole KIP for the first time. Nice work!


One minor comment. In the table of the standalone consumer, the first
line, first column says:

> = Empty
> (protocolType = Some("consumer"))

I think this should be

> = Empty
> (protocolType != None)
Note, that for example KafkaStreams uses a different protocol type
(namely "stream"). Also, other consumer must implement their own
partition assignors, too, with other names.



-Matthias


On 3/26/18 1:44 PM, Vahid S Hashemian wrote:
> Hi all,
> 
> Thanks for the feedback on this KIP so far.
> 
> If there is no additional feedback, I'll start a vote on Wed.
> 
> Thanks.
> --Vahid
> 
> 

[attachment "signature.asc" deleted by Vahid S Hashemian/Silicon 
Valley/IBM] 




Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I was just reading the whole KIP for the first time. Nice work!


One minor comment. In the table of the standalone consumer, the first
line, first column says:

> = Empty
> (protocolType = Some("consumer"))

I think this should be

> = Empty
> (protocolType != None)
Note, that for example KafkaStreams uses a different protocol type
(namely "stream"). Also, other consumer must implement their own
partition assignors, too, with other names.



-Matthias


On 3/26/18 1:44 PM, Vahid S Hashemian wrote:
> Hi all,
> 
> Thanks for the feedback on this KIP so far.
> 
> If there is no additional feedback, I'll start a vote on Wed.
> 
> Thanks.
> --Vahid
> 
> 


Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi all,

Thanks for the feedback on this KIP so far.

If there is no additional feedback, I'll start a vote on Wed.

Thanks.
--Vahid


Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks for your feedback and suggestion.

I updated the name "empty_state_timestamp" to "current_state_timestamp" to 
expand its usage to all state changes. If you can think of a better name 
let me know.
And, I fixed the statement on Dead group state to include the coordinator 
change case.

Thanks!
--Vahid



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   03/17/2018 11:50 AM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Sorry for the delay. I've read through the current KIP and it looks good. 
I
had one minor suggestion: instead of making the timestamp in the group
metadata state message specific to the empty state transition (i.e.
"empty_state_timestamp"), could we leave it generic and let it indicate 
the
time of the current state change (whether it is to Empty, Stable, or
whatever)? Once a group becomes empty, we do not update the state until
deletion anyway, so the timestamp would not change.

Also a minor correction. When a group transitions to Dead, it does not
necessarily indicate offsets have removed. We also use this state when
there is a coordinator change and the group is unloaded from the
coordinator cache.

Thanks,
Jason




On Tue, Mar 6, 2018 at 4:41 PM, Vahid S Hashemian 
<vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> Thanks a lot for your clarification and feedback.
> Your statements below all seem reasonable to me.
>
> I have updated the KIP according to the conversation so far.
> It contains significant changes compared to the initial version, so it
> might be worth glancing over the whole thing one more time in case I've
> missed something :)
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   03/05/2018 03:42 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> On point #1 below: since the expiration timer starts ticking after the
> > group becomes Empty the expire_timestamp of group offsets will be set
> when
> > that transition occurs. In normal cases that expire_timestamp is
> > calculated as "current timestamp" + "broker's offset retention". Then 
if
> > an old client provides a custom retention, we probably need a way to
> store
> > that custom retention (and use it once the group becomes Empty). One
> place
> > to store it is in group metadata message, but the issue is we would be
> > introducing a new field only for backward compatibility (new clients
> don't
> > overwrite the broker's retention), unless we somehow want to support
> this
> > retention on a per-group basis. What do you think?
>
>
> Here's what I was thinking. The current offset commit schema looks like
> this:
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>   ExpireTimestmap => Long
>
> If we have any clients that ask for an explicit retention timeout, then 
we
> can continue using this schema and providing the current behavior. The
> offsets will be retained until they are individually expired.
>
> For newer clients or those that request the default retention, we can 
bump
> the schema and remove ExpireTimestamp.
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>
> We also need to bump the version of the group metadata schema to include
> the timestamp of the state change. There are two cases: standalone
> "simple"
> consumers and consumer groups.
>
> 1) For standalone consumers, we'll expire based on the commit timestamp 
of
> the offset message. Internally, the group will be Empty and have no
> transition timestamp, so the expiration criteria is when (now - commit
> timestamp) is greater than the configured retention time.
>
> 2) For consumer groups, we'll expire based on the timestamp that the 
group
> transitioned to Empty.
>
> This way, changing the retention time config will affect all existing
> groups except those from older clients that are requesting an explicit
> retention time. Would that work?
>
> On point #3: as you mentioned, currently there is no "notification"
> > mechanism for GroupMetadataManager in place when a subscription change
> > occurs. The member subscription however is available in the group
> metadata
> > and a poll approach could be used to check group subscriptions on a
> > regular basis and expire stale offsets (if there are topics the group 
no
> > longer is subscribed to). This can be done as part of the offset 
cleanup
> > scheduled task that by default does not run very frequently. Were you
> > thinking of a different method for capturing the subscription change?
>
>
> Yes, I think that can work.  So we would expire offsets for a consumer
> group individually if they have reached the retention time and the group
> is
> not empty, but is no longer subscribed to them. Is that right?
>
>
> Thanks,
> Jason
>
>
>
>
> On Fri, Mar 2, 2018 at 3:36 PM, Vahid S Hashemian
> <vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I'm thinking through some of the details of the KIP with respect to 
your
> > feedback and the decision to keep the expire-timestamp for each group
> > partition in the offset message.
> >
> > On point #1 below: since the expiration timer starts ticking after the
> > group becomes Empty the expire_timestamp of group offsets will be set
> when
> > that transition occurs. In normal cases that expire_timestamp is
> > calculated as "current timestamp" + "broker's offset retention". Then 
if
> > an old client provides a custom retention, we probably need a way to
> store
> > that custom retention (and use it once the group becomes Empty). One
> place
> > to store it is in group metadata message, but the issue is we would be
> > introducing a new field only for backward compatibility (new clients
> don't
> > overwrite the broker's retention), unless we somehow want to support
> this
> > retention on a per-group basis. What do you think?
> >
> > On point #3: as you mentioned, currently there is no "notification"
> > mechanism for GroupMetadataManager in place when a subscription change
> > occurs. The member subscription however is available in the group
> metadata
> > and a poll approach could be used to check group subscriptions on a
> > regular basis and expire stale offsets (if there are topics the group 
no
> > longer is subscribed to). This can be done as part of the offset 
cleanup
> > scheduled task that by default does not run very frequently. Were you
> > thinking of a different method for capturing the subscription change?
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> >
> > From:   Jason Gustafson <ja...@confluent.io>
> > To:     dev@kafka.apache.org
> > Date:   02/18/2018 01:16 PM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Hey Vahid,
> >
> > Sorry for the late response. The KIP looks good. A few comments:
> >
> > 1. I'm not quite sure I understand how you are handling old clients. 
It
> > sounds like you are saying that old clients need to change
> configuration?
> > I'd suggest 1) if an old client requests the default expiration, then 
we
> > use the updated behavior, and 2) if the old client requests a specific
> > expiration, we enforce it from the time the group becomes Empty.
> >
> > 2. Does this require a new version of the group metadata messsage
> format?
> > I
> > think we need to add a new field to indicate the time that the group
> state
> > changed to Empty. This will allow us to resume the expiration timer
> > correctly after a coordinator change. Alternatively, we could reset 
the
> > expiration timeout after every coordinator move, but it would be nice 
to
> > have a definite bound on offset expiration.
> >
> > 3. The question about removal of offsets for partitions which are no
> > longer
> > in use is interesting. At the moment, it's difficult for the 
coordinator
> > to
> > know that a partition is no longer being fetched because it is 
agnostic
> to
> > subscription state (the group coordinator is used for more than just
> > consumer groups). Even if we allow the coordinator to read 
subscription
> > state to tell which topics are no longer being consumed, we might need
> > some
> > additional bookkeeping to keep track of /when/ the consumer stopped
> > subscribing to a particular topic. Or maybe we can reset this 
expiration
> > timer after every coordinator change when the new coordinator reads 
the
> > offsets and group metadata? I am not sure how common this use case is
> and
> > whether it needs to be solved as part of this KIP.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Thanks James for sharing that scenario.
> > >
> > > I agree it makes sense to be able to remove offsets for the topics
> that
> > > are no longer "active" in the group.
> > > I think it becomes important to determine what constitutes that a
> topic
> > is
> > > no longer active: If we use per-partition expiration we would 
manually
> > > choose a retention time that works for the particular scenario.
> > >
> > > That works, but since we are manually intervening and specify a
> > > per-partition retention, why not do the intervention in some other
> way:
> > >
> > > One alternative for this intervention, to favor the simplicity of 
the
> > > suggested protocol in the KIP, is to improve upon the just 
introduced
> > > DELETE_GROUPS API and allow for deletion of offsets of specific 
topics
> > in
> > > the group. This is what the old ZooKeeper based group management
> > supported
> > > anyway, and we would just be leveling the group deletion features of
> the
> > > Kafka-based group management with the ZooKeeper-based one.
> > >
> > > So, instead of deciding in advance when the offsets should be 
removed
> we
> > > would instantly remove them when we are sure that they are no longer
> > > needed.
> > >
> > > Let me know what you think.
> > >
> > > Thanks.
> > > --Vahid
> > >
> > >
> > >
> > > From:   James Cheng <wu...@gmail.com>
> > > To:     dev@kafka.apache.org
> > > Date:   02/01/2018 12:37 AM
> > > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics 
of
> > > Consumer Group Offsets
> > >
> > >
> > >
> > > Vahid,
> > >
> > > Under rejected alternatives, we had decided that we did NOT want to 
do
> > > per-partition expiration, and instead we wait until the entire group
> is
> > > empty and then (after the right time has passed) expire the entire
> group
> > > at once.
> > >
> > > I thought of one scenario that might benefit from per-partition
> > > expiration.
> > >
> > > Let's say I have topics A B C... Z. So, I have 26 topics, all of 
them
> > > single partition, so 26 partitions. Let's say I have mirrormaker
> > mirroring
> > > those 26 topics. The group will then have 26 committed offsets.
> > >
> > > Let's say I then change the whitelist on mirrormaker so that it only
> > > mirrors topic Z, but I keep the same consumer group name. (I imagine
> > that
> > > is a common thing to do?)
> > >
> > > With the proposed design for this KIP, the committed offsets for
> topics
> > A
> > > through Y will stay around as long as this mirroring group name
> exists.
> > >
> > > In the current implementation that already exists (prior to this 
KIP),
> I
> > > belive that committed offsets for topics A through Y will expire.
> > >
> > > How much do we care about this case?
> > >
> > > -James
> > >
> > > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com>
> wrote:
> > > >
> > > > Bumping this as I'd like to see it land...
> > > >
> > > > It's one of the "features" that tends to catch Kafka n00bs 
unawares
> > and
> > > > typically results in message skippage/loss, vs the proposed 
solution
> > is
> > > > much more intuitive behavior.
> > > >
> > > > Plus it's more wire efficient because consumers no longer need to
> > commit
> > > > offsets for partitions that have no new messages just to keep 
those
> > > offsets
> > > > alive.
> > > >
> > > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > >> There has been no further discussion on this KIP for about two
> > months.
> > > >> So I thought I'd provide the scoop hoping it would spark 
additional
> > > >> feedback and move the KIP forward.
> > > >>
> > > >> The KIP proposes a method to preserve group offsets as long as 
the
> > > group
> > > >> is not in Empty state (even when offsets are committed very
> rarely),
> > > and
> > > >> start the offset expiration of the group as soon as the group
> becomes
> > > >> Empty.
> > > >> It suggests dropping the `retention_time` field from the
> > `OffsetCommit`
> > > >> request and, instead, enforcing it via the broker config
> > > >> `offsets.retention.minutes` for all groups. In other words, all
> > groups
> > > >> will have the same retention time.
> > > >> The KIP presumes that this global retention config would suffice
> > common
> > > >> use cases and does not lead to, e.g., unmanageable offset cache
> size
> > > (for
> > > >> groups that don't need to stay around that long). It suggests
> opening
> > > >> another KIP if this global retention setting proves to be
> problematic
> > > in
> > > >> the future. It was suggested earlier in the discussion thread 
that
> > the
> > > KIP
> > > >> should propose a per-group retention config to circumvent this
> risk.
> > > >>
> > > >> I look forward to hearing your thoughts. Thanks!
> > > >>
> > > >> --Vahid
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > > >> To:     dev <de...@kafka.apache.org>
> > > >> Date:   10/18/2017 04:45 PM
> > > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > > Consumer
> > > >> Group Offsets
> > > >>
> > > >>
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I created a KIP to address the group offset expiration issue
> reported
> > > in
> > > >> KAFKA-4682:
> > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > > >>
> > > >>
> > > >> Your feedback is welcome!
> > > >>
> > > >> Thanks.
> > > >> --Vahid
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > *Jeff Widman*
> > > > jeffwidman.com <
> > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> > >
> >
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > > 9xkoI&e=
> > > > | 740-WIDMAN-J (943-6265)
> > > > <><
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
>
>
>
>
>





Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

Sorry for the delay. I've read through the current KIP and it looks good. I
had one minor suggestion: instead of making the timestamp in the group
metadata state message specific to the empty state transition (i.e.
"empty_state_timestamp"), could we leave it generic and let it indicate the
time of the current state change (whether it is to Empty, Stable, or
whatever)? Once a group becomes empty, we do not update the state until
deletion anyway, so the timestamp would not change.

Also a minor correction. When a group transitions to Dead, it does not
necessarily indicate offsets have removed. We also use this state when
there is a coordinator change and the group is unloaded from the
coordinator cache.

Thanks,
Jason




On Tue, Mar 6, 2018 at 4:41 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> Thanks a lot for your clarification and feedback.
> Your statements below all seem reasonable to me.
>
> I have updated the KIP according to the conversation so far.
> It contains significant changes compared to the initial version, so it
> might be worth glancing over the whole thing one more time in case I've
> missed something :)
>
> Thanks.
> --Vahid
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   03/05/2018 03:42 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> On point #1 below: since the expiration timer starts ticking after the
> > group becomes Empty the expire_timestamp of group offsets will be set
> when
> > that transition occurs. In normal cases that expire_timestamp is
> > calculated as "current timestamp" + "broker's offset retention". Then if
> > an old client provides a custom retention, we probably need a way to
> store
> > that custom retention (and use it once the group becomes Empty). One
> place
> > to store it is in group metadata message, but the issue is we would be
> > introducing a new field only for backward compatibility (new clients
> don't
> > overwrite the broker's retention), unless we somehow want to support
> this
> > retention on a per-group basis. What do you think?
>
>
> Here's what I was thinking. The current offset commit schema looks like
> this:
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>   ExpireTimestmap => Long
>
> If we have any clients that ask for an explicit retention timeout, then we
> can continue using this schema and providing the current behavior. The
> offsets will be retained until they are individually expired.
>
> For newer clients or those that request the default retention, we can bump
> the schema and remove ExpireTimestamp.
>
> OffsetCommit =>
>   Offset => Long
>   Metadata => String
>   CommitTimestamp => Long
>
> We also need to bump the version of the group metadata schema to include
> the timestamp of the state change. There are two cases: standalone
> "simple"
> consumers and consumer groups.
>
> 1) For standalone consumers, we'll expire based on the commit timestamp of
> the offset message. Internally, the group will be Empty and have no
> transition timestamp, so the expiration criteria is when (now - commit
> timestamp) is greater than the configured retention time.
>
> 2) For consumer groups, we'll expire based on the timestamp that the group
> transitioned to Empty.
>
> This way, changing the retention time config will affect all existing
> groups except those from older clients that are requesting an explicit
> retention time. Would that work?
>
> On point #3: as you mentioned, currently there is no "notification"
> > mechanism for GroupMetadataManager in place when a subscription change
> > occurs. The member subscription however is available in the group
> metadata
> > and a poll approach could be used to check group subscriptions on a
> > regular basis and expire stale offsets (if there are topics the group no
> > longer is subscribed to). This can be done as part of the offset cleanup
> > scheduled task that by default does not run very frequently. Were you
> > thinking of a different method for capturing the subscription change?
>
>
> Yes, I think that can work.  So we would expire offsets for a consumer
> group individually if they have reached the retention time and the group
> is
> not empty, but is no longer subscribed to them. Is that right?
>
>
> Thanks,
> Jason
>
>
>
>
> On Fri, Mar 2, 2018 at 3:36 PM, Vahid S Hashemian
> <vahidhashemian@us.ibm.com
> > wrote:
>
> > Hi Jason,
> >
> > I'm thinking through some of the details of the KIP with respect to your
> > feedback and the decision to keep the expire-timestamp for each group
> > partition in the offset message.
> >
> > On point #1 below: since the expiration timer starts ticking after the
> > group becomes Empty the expire_timestamp of group offsets will be set
> when
> > that transition occurs. In normal cases that expire_timestamp is
> > calculated as "current timestamp" + "broker's offset retention". Then if
> > an old client provides a custom retention, we probably need a way to
> store
> > that custom retention (and use it once the group becomes Empty). One
> place
> > to store it is in group metadata message, but the issue is we would be
> > introducing a new field only for backward compatibility (new clients
> don't
> > overwrite the broker's retention), unless we somehow want to support
> this
> > retention on a per-group basis. What do you think?
> >
> > On point #3: as you mentioned, currently there is no "notification"
> > mechanism for GroupMetadataManager in place when a subscription change
> > occurs. The member subscription however is available in the group
> metadata
> > and a poll approach could be used to check group subscriptions on a
> > regular basis and expire stale offsets (if there are topics the group no
> > longer is subscribed to). This can be done as part of the offset cleanup
> > scheduled task that by default does not run very frequently. Were you
> > thinking of a different method for capturing the subscription change?
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> >
> > From:   Jason Gustafson <ja...@confluent.io>
> > To:     dev@kafka.apache.org
> > Date:   02/18/2018 01:16 PM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Hey Vahid,
> >
> > Sorry for the late response. The KIP looks good. A few comments:
> >
> > 1. I'm not quite sure I understand how you are handling old clients. It
> > sounds like you are saying that old clients need to change
> configuration?
> > I'd suggest 1) if an old client requests the default expiration, then we
> > use the updated behavior, and 2) if the old client requests a specific
> > expiration, we enforce it from the time the group becomes Empty.
> >
> > 2. Does this require a new version of the group metadata messsage
> format?
> > I
> > think we need to add a new field to indicate the time that the group
> state
> > changed to Empty. This will allow us to resume the expiration timer
> > correctly after a coordinator change. Alternatively, we could reset the
> > expiration timeout after every coordinator move, but it would be nice to
> > have a definite bound on offset expiration.
> >
> > 3. The question about removal of offsets for partitions which are no
> > longer
> > in use is interesting. At the moment, it's difficult for the coordinator
> > to
> > know that a partition is no longer being fetched because it is agnostic
> to
> > subscription state (the group coordinator is used for more than just
> > consumer groups). Even if we allow the coordinator to read subscription
> > state to tell which topics are no longer being consumed, we might need
> > some
> > additional bookkeeping to keep track of /when/ the consumer stopped
> > subscribing to a particular topic. Or maybe we can reset this expiration
> > timer after every coordinator change when the new coordinator reads the
> > offsets and group metadata? I am not sure how common this use case is
> and
> > whether it needs to be solved as part of this KIP.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> > > Thanks James for sharing that scenario.
> > >
> > > I agree it makes sense to be able to remove offsets for the topics
> that
> > > are no longer "active" in the group.
> > > I think it becomes important to determine what constitutes that a
> topic
> > is
> > > no longer active: If we use per-partition expiration we would manually
> > > choose a retention time that works for the particular scenario.
> > >
> > > That works, but since we are manually intervening and specify a
> > > per-partition retention, why not do the intervention in some other
> way:
> > >
> > > One alternative for this intervention, to favor the simplicity of the
> > > suggested protocol in the KIP, is to improve upon the just introduced
> > > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> > in
> > > the group. This is what the old ZooKeeper based group management
> > supported
> > > anyway, and we would just be leveling the group deletion features of
> the
> > > Kafka-based group management with the ZooKeeper-based one.
> > >
> > > So, instead of deciding in advance when the offsets should be removed
> we
> > > would instantly remove them when we are sure that they are no longer
> > > needed.
> > >
> > > Let me know what you think.
> > >
> > > Thanks.
> > > --Vahid
> > >
> > >
> > >
> > > From:   James Cheng <wu...@gmail.com>
> > > To:     dev@kafka.apache.org
> > > Date:   02/01/2018 12:37 AM
> > > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > > Consumer Group Offsets
> > >
> > >
> > >
> > > Vahid,
> > >
> > > Under rejected alternatives, we had decided that we did NOT want to do
> > > per-partition expiration, and instead we wait until the entire group
> is
> > > empty and then (after the right time has passed) expire the entire
> group
> > > at once.
> > >
> > > I thought of one scenario that might benefit from per-partition
> > > expiration.
> > >
> > > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > > single partition, so 26 partitions. Let's say I have mirrormaker
> > mirroring
> > > those 26 topics. The group will then have 26 committed offsets.
> > >
> > > Let's say I then change the whitelist on mirrormaker so that it only
> > > mirrors topic Z, but I keep the same consumer group name. (I imagine
> > that
> > > is a common thing to do?)
> > >
> > > With the proposed design for this KIP, the committed offsets for
> topics
> > A
> > > through Y will stay around as long as this mirroring group name
> exists.
> > >
> > > In the current implementation that already exists (prior to this KIP),
> I
> > > belive that committed offsets for topics A through Y will expire.
> > >
> > > How much do we care about this case?
> > >
> > > -James
> > >
> > > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com>
> wrote:
> > > >
> > > > Bumping this as I'd like to see it land...
> > > >
> > > > It's one of the "features" that tends to catch Kafka n00bs unawares
> > and
> > > > typically results in message skippage/loss, vs the proposed solution
> > is
> > > > much more intuitive behavior.
> > > >
> > > > Plus it's more wire efficient because consumers no longer need to
> > commit
> > > > offsets for partitions that have no new messages just to keep those
> > > offsets
> > > > alive.
> > > >
> > > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > > vahidhashemian@us.ibm.com> wrote:
> > > >
> > > >> There has been no further discussion on this KIP for about two
> > months.
> > > >> So I thought I'd provide the scoop hoping it would spark additional
> > > >> feedback and move the KIP forward.
> > > >>
> > > >> The KIP proposes a method to preserve group offsets as long as the
> > > group
> > > >> is not in Empty state (even when offsets are committed very
> rarely),
> > > and
> > > >> start the offset expiration of the group as soon as the group
> becomes
> > > >> Empty.
> > > >> It suggests dropping the `retention_time` field from the
> > `OffsetCommit`
> > > >> request and, instead, enforcing it via the broker config
> > > >> `offsets.retention.minutes` for all groups. In other words, all
> > groups
> > > >> will have the same retention time.
> > > >> The KIP presumes that this global retention config would suffice
> > common
> > > >> use cases and does not lead to, e.g., unmanageable offset cache
> size
> > > (for
> > > >> groups that don't need to stay around that long). It suggests
> opening
> > > >> another KIP if this global retention setting proves to be
> problematic
> > > in
> > > >> the future. It was suggested earlier in the discussion thread that
> > the
> > > KIP
> > > >> should propose a per-group retention config to circumvent this
> risk.
> > > >>
> > > >> I look forward to hearing your thoughts. Thanks!
> > > >>
> > > >> --Vahid
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > > >> To:     dev <de...@kafka.apache.org>
> > > >> Date:   10/18/2017 04:45 PM
> > > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > > Consumer
> > > >> Group Offsets
> > > >>
> > > >>
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I created a KIP to address the group offset expiration issue
> reported
> > > in
> > > >> KAFKA-4682:
> > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > > >>
> > > >>
> > > >> Your feedback is welcome!
> > > >>
> > > >> Thanks.
> > > >> --Vahid
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > *Jeff Widman*
> > > > jeffwidman.com <
> > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> > >
> >
> 3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > > 9xkoI&e=
> > > > | 740-WIDMAN-J (943-6265)
> > > > <><
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
>
>
>
>
>

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

Thanks a lot for your clarification and feedback.
Your statements below all seem reasonable to me.

I have updated the KIP according to the conversation so far.
It contains significant changes compared to the initial version, so it 
might be worth glancing over the whole thing one more time in case I've 
missed something :)

Thanks.
--Vahid



From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   03/05/2018 03:42 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set 
when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to 
store
> that custom retention (and use it once the group becomes Empty). One 
place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients 
don't
> overwrite the broker's retention), unless we somehow want to support 
this
> retention on a per-group basis. What do you think?


Here's what I was thinking. The current offset commit schema looks like
this:

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long
  ExpireTimestmap => Long

If we have any clients that ask for an explicit retention timeout, then we
can continue using this schema and providing the current behavior. The
offsets will be retained until they are individually expired.

For newer clients or those that request the default retention, we can bump
the schema and remove ExpireTimestamp.

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long

We also need to bump the version of the group metadata schema to include
the timestamp of the state change. There are two cases: standalone 
"simple"
consumers and consumer groups.

1) For standalone consumers, we'll expire based on the commit timestamp of
the offset message. Internally, the group will be Empty and have no
transition timestamp, so the expiration criteria is when (now - commit
timestamp) is greater than the configured retention time.

2) For consumer groups, we'll expire based on the timestamp that the group
transitioned to Empty.

This way, changing the retention time config will affect all existing
groups except those from older clients that are requesting an explicit
retention time. Would that work?

On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group 
metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?


Yes, I think that can work.  So we would expire offsets for a consumer
group individually if they have reached the retention time and the group 
is
not empty, but is no longer subscribed to them. Is that right?


Thanks,
Jason




On Fri, Mar 2, 2018 at 3:36 PM, Vahid S Hashemian 
<vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> I'm thinking through some of the details of the KIP with respect to your
> feedback and the decision to keep the expire-timestamp for each group
> partition in the offset message.
>
> On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set 
when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to 
store
> that custom retention (and use it once the group becomes Empty). One 
place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients 
don't
> overwrite the broker's retention), unless we somehow want to support 
this
> retention on a per-group basis. What do you think?
>
> On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group 
metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change 
configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage 
format?
> I
> think we need to add a new field to indicate the time that the group 
state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic 
to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is 
and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics 
that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a 
topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other 
way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of 
the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed 
we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wu...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group 
is
> > empty and then (after the right time has passed) expire the entire 
group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for 
topics
> A
> > through Y will stay around as long as this mirroring group name 
exists.
> >
> > In the current implementation that already exists (prior to this KIP), 
I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> 
wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very 
rarely),
> > and
> > >> start the offset expiration of the group as soon as the group 
becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache 
size
> > (for
> > >> groups that don't need to stay around that long). It suggests 
opening
> > >> another KIP if this global retention setting proves to be 
problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this 
risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > >> To:     dev <de...@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue 
reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>





Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to store
> that custom retention (and use it once the group becomes Empty). One place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients don't
> overwrite the broker's retention), unless we somehow want to support this
> retention on a per-group basis. What do you think?


Here's what I was thinking. The current offset commit schema looks like
this:

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long
  ExpireTimestmap => Long

If we have any clients that ask for an explicit retention timeout, then we
can continue using this schema and providing the current behavior. The
offsets will be retained until they are individually expired.

For newer clients or those that request the default retention, we can bump
the schema and remove ExpireTimestamp.

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long

We also need to bump the version of the group metadata schema to include
the timestamp of the state change. There are two cases: standalone "simple"
consumers and consumer groups.

1) For standalone consumers, we'll expire based on the commit timestamp of
the offset message. Internally, the group will be Empty and have no
transition timestamp, so the expiration criteria is when (now - commit
timestamp) is greater than the configured retention time.

2) For consumer groups, we'll expire based on the timestamp that the group
transitioned to Empty.

This way, changing the retention time config will affect all existing
groups except those from older clients that are requesting an explicit
retention time. Would that work?

On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?


Yes, I think that can work.  So we would expire offsets for a consumer
group individually if they have reached the retention time and the group is
not empty, but is no longer subscribed to them. Is that right?


Thanks,
Jason




On Fri, Mar 2, 2018 at 3:36 PM, Vahid S Hashemian <vahidhashemian@us.ibm.com
> wrote:

> Hi Jason,
>
> I'm thinking through some of the details of the KIP with respect to your
> feedback and the decision to keep the expire-timestamp for each group
> partition in the offset message.
>
> On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to store
> that custom retention (and use it once the group becomes Empty). One place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients don't
> overwrite the broker's retention), unless we somehow want to support this
> retention on a per-group basis. What do you think?
>
> On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage format?
> I
> think we need to add a new field to indicate the time that the group state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wu...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group is
> > empty and then (after the right time has passed) expire the entire group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for topics
> A
> > through Y will stay around as long as this mirroring group name exists.
> >
> > In the current implementation that already exists (prior to this KIP), I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashemian@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very rarely),
> > and
> > >> start the offset expiration of the group as soon as the group becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache size
> > (for
> > >> groups that don't need to stay around that long). It suggests opening
> > >> another KIP if this global retention setting proves to be problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> > >> To:     dev <de...@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Hi Jason,

I'm thinking through some of the details of the KIP with respect to your 
feedback and the decision to keep the expire-timestamp for each group 
partition in the offset message.

On point #1 below: since the expiration timer starts ticking after the 
group becomes Empty the expire_timestamp of group offsets will be set when 
that transition occurs. In normal cases that expire_timestamp is 
calculated as "current timestamp" + "broker's offset retention". Then if 
an old client provides a custom retention, we probably need a way to store 
that custom retention (and use it once the group becomes Empty). One place 
to store it is in group metadata message, but the issue is we would be 
introducing a new field only for backward compatibility (new clients don't 
overwrite the broker's retention), unless we somehow want to support this 
retention on a per-group basis. What do you think?

On point #3: as you mentioned, currently there is no "notification" 
mechanism for GroupMetadataManager in place when a subscription change 
occurs. The member subscription however is available in the group metadata 
and a poll approach could be used to check group subscriptions on a 
regular basis and expire stale offsets (if there are topics the group no 
longer is subscribed to). This can be done as part of the offset cleanup 
scheduled task that by default does not run very frequently. Were you 
thinking of a different method for capturing the subscription change?

Thanks.
--Vahid




From:   Jason Gustafson <ja...@confluent.io>
To:     dev@kafka.apache.org
Date:   02/18/2018 01:16 PM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Hey Vahid,

Sorry for the late response. The KIP looks good. A few comments:

1. I'm not quite sure I understand how you are handling old clients. It
sounds like you are saying that old clients need to change configuration?
I'd suggest 1) if an old client requests the default expiration, then we
use the updated behavior, and 2) if the old client requests a specific
expiration, we enforce it from the time the group becomes Empty.

2. Does this require a new version of the group metadata messsage format? 
I
think we need to add a new field to indicate the time that the group state
changed to Empty. This will allow us to resume the expiration timer
correctly after a coordinator change. Alternatively, we could reset the
expiration timeout after every coordinator move, but it would be nice to
have a definite bound on offset expiration.

3. The question about removal of offsets for partitions which are no 
longer
in use is interesting. At the moment, it's difficult for the coordinator 
to
know that a partition is no longer being fetched because it is agnostic to
subscription state (the group coordinator is used for more than just
consumer groups). Even if we allow the coordinator to read subscription
state to tell which topics are no longer being consumed, we might need 
some
additional bookkeeping to keep track of /when/ the consumer stopped
subscribing to a particular topic. Or maybe we can reset this expiration
timer after every coordinator change when the new coordinator reads the
offsets and group metadata? I am not sure how common this use case is and
whether it needs to be solved as part of this KIP.

Thanks,
Jason



On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Thanks James for sharing that scenario.
>
> I agree it makes sense to be able to remove offsets for the topics that
> are no longer "active" in the group.
> I think it becomes important to determine what constitutes that a topic 
is
> no longer active: If we use per-partition expiration we would manually
> choose a retention time that works for the particular scenario.
>
> That works, but since we are manually intervening and specify a
> per-partition retention, why not do the intervention in some other way:
>
> One alternative for this intervention, to favor the simplicity of the
> suggested protocol in the KIP, is to improve upon the just introduced
> DELETE_GROUPS API and allow for deletion of offsets of specific topics 
in
> the group. This is what the old ZooKeeper based group management 
supported
> anyway, and we would just be leveling the group deletion features of the
> Kafka-based group management with the ZooKeeper-based one.
>
> So, instead of deciding in advance when the offsets should be removed we
> would instantly remove them when we are sure that they are no longer
> needed.
>
> Let me know what you think.
>
> Thanks.
> --Vahid
>
>
>
> From:   James Cheng <wu...@gmail.com>
> To:     dev@kafka.apache.org
> Date:   02/01/2018 12:37 AM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Vahid,
>
> Under rejected alternatives, we had decided that we did NOT want to do
> per-partition expiration, and instead we wait until the entire group is
> empty and then (after the right time has passed) expire the entire group
> at once.
>
> I thought of one scenario that might benefit from per-partition
> expiration.
>
> Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> single partition, so 26 partitions. Let's say I have mirrormaker 
mirroring
> those 26 topics. The group will then have 26 committed offsets.
>
> Let's say I then change the whitelist on mirrormaker so that it only
> mirrors topic Z, but I keep the same consumer group name. (I imagine 
that
> is a common thing to do?)
>
> With the proposed design for this KIP, the committed offsets for topics 
A
> through Y will stay around as long as this mirroring group name exists.
>
> In the current implementation that already exists (prior to this KIP), I
> belive that committed offsets for topics A through Y will expire.
>
> How much do we care about this case?
>
> -James
>
> > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> >
> > Bumping this as I'd like to see it land...
> >
> > It's one of the "features" that tends to catch Kafka n00bs unawares 
and
> > typically results in message skippage/loss, vs the proposed solution 
is
> > much more intuitive behavior.
> >
> > Plus it's more wire efficient because consumers no longer need to 
commit
> > offsets for partitions that have no new messages just to keep those
> offsets
> > alive.
> >
> > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> >> There has been no further discussion on this KIP for about two 
months.
> >> So I thought I'd provide the scoop hoping it would spark additional
> >> feedback and move the KIP forward.
> >>
> >> The KIP proposes a method to preserve group offsets as long as the
> group
> >> is not in Empty state (even when offsets are committed very rarely),
> and
> >> start the offset expiration of the group as soon as the group becomes
> >> Empty.
> >> It suggests dropping the `retention_time` field from the 
`OffsetCommit`
> >> request and, instead, enforcing it via the broker config
> >> `offsets.retention.minutes` for all groups. In other words, all 
groups
> >> will have the same retention time.
> >> The KIP presumes that this global retention config would suffice 
common
> >> use cases and does not lead to, e.g., unmanageable offset cache size
> (for
> >> groups that don't need to stay around that long). It suggests opening
> >> another KIP if this global retention setting proves to be problematic
> in
> >> the future. It was suggested earlier in the discussion thread that 
the
> KIP
> >> should propose a per-group retention config to circumvent this risk.
> >>
> >> I look forward to hearing your thoughts. Thanks!
> >>
> >> --Vahid
> >>
> >>
> >>
> >>
> >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> >> To:     dev <de...@kafka.apache.org>
> >> Date:   10/18/2017 04:45 PM
> >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer
> >> Group Offsets
> >>
> >>
> >>
> >> Hi all,
> >>
> >> I created a KIP to address the group offset expiration issue reported
> in
> >> KAFKA-4682:
> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> >>
> >>
> >> Your feedback is welcome!
> >>
> >> Thanks.
> >> --Vahid
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> > --
> >
> > *Jeff Widman*
> > jeffwidman.com <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> 
3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> 9xkoI&e=
> > | 740-WIDMAN-J (943-6265)
> > <><
>
>
>
>
>
>





Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Vahid,

Sorry for the late response. The KIP looks good. A few comments:

1. I'm not quite sure I understand how you are handling old clients. It
sounds like you are saying that old clients need to change configuration?
I'd suggest 1) if an old client requests the default expiration, then we
use the updated behavior, and 2) if the old client requests a specific
expiration, we enforce it from the time the group becomes Empty.

2. Does this require a new version of the group metadata messsage format? I
think we need to add a new field to indicate the time that the group state
changed to Empty. This will allow us to resume the expiration timer
correctly after a coordinator change. Alternatively, we could reset the
expiration timeout after every coordinator move, but it would be nice to
have a definite bound on offset expiration.

3. The question about removal of offsets for partitions which are no longer
in use is interesting. At the moment, it's difficult for the coordinator to
know that a partition is no longer being fetched because it is agnostic to
subscription state (the group coordinator is used for more than just
consumer groups). Even if we allow the coordinator to read subscription
state to tell which topics are no longer being consumed, we might need some
additional bookkeeping to keep track of /when/ the consumer stopped
subscribing to a particular topic. Or maybe we can reset this expiration
timer after every coordinator change when the new coordinator reads the
offsets and group metadata? I am not sure how common this use case is and
whether it needs to be solved as part of this KIP.

Thanks,
Jason



On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> Thanks James for sharing that scenario.
>
> I agree it makes sense to be able to remove offsets for the topics that
> are no longer "active" in the group.
> I think it becomes important to determine what constitutes that a topic is
> no longer active: If we use per-partition expiration we would manually
> choose a retention time that works for the particular scenario.
>
> That works, but since we are manually intervening and specify a
> per-partition retention, why not do the intervention in some other way:
>
> One alternative for this intervention, to favor the simplicity of the
> suggested protocol in the KIP, is to improve upon the just introduced
> DELETE_GROUPS API and allow for deletion of offsets of specific topics in
> the group. This is what the old ZooKeeper based group management supported
> anyway, and we would just be leveling the group deletion features of the
> Kafka-based group management with the ZooKeeper-based one.
>
> So, instead of deciding in advance when the offsets should be removed we
> would instantly remove them when we are sure that they are no longer
> needed.
>
> Let me know what you think.
>
> Thanks.
> --Vahid
>
>
>
> From:   James Cheng <wu...@gmail.com>
> To:     dev@kafka.apache.org
> Date:   02/01/2018 12:37 AM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Vahid,
>
> Under rejected alternatives, we had decided that we did NOT want to do
> per-partition expiration, and instead we wait until the entire group is
> empty and then (after the right time has passed) expire the entire group
> at once.
>
> I thought of one scenario that might benefit from per-partition
> expiration.
>
> Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> single partition, so 26 partitions. Let's say I have mirrormaker mirroring
> those 26 topics. The group will then have 26 committed offsets.
>
> Let's say I then change the whitelist on mirrormaker so that it only
> mirrors topic Z, but I keep the same consumer group name. (I imagine that
> is a common thing to do?)
>
> With the proposed design for this KIP, the committed offsets for topics A
> through Y will stay around as long as this mirroring group name exists.
>
> In the current implementation that already exists (prior to this KIP), I
> belive that committed offsets for topics A through Y will expire.
>
> How much do we care about this case?
>
> -James
>
> > On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> >
> > Bumping this as I'd like to see it land...
> >
> > It's one of the "features" that tends to catch Kafka n00bs unawares and
> > typically results in message skippage/loss, vs the proposed solution is
> > much more intuitive behavior.
> >
> > Plus it's more wire efficient because consumers no longer need to commit
> > offsets for partitions that have no new messages just to keep those
> offsets
> > alive.
> >
> > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > vahidhashemian@us.ibm.com> wrote:
> >
> >> There has been no further discussion on this KIP for about two months.
> >> So I thought I'd provide the scoop hoping it would spark additional
> >> feedback and move the KIP forward.
> >>
> >> The KIP proposes a method to preserve group offsets as long as the
> group
> >> is not in Empty state (even when offsets are committed very rarely),
> and
> >> start the offset expiration of the group as soon as the group becomes
> >> Empty.
> >> It suggests dropping the `retention_time` field from the `OffsetCommit`
> >> request and, instead, enforcing it via the broker config
> >> `offsets.retention.minutes` for all groups. In other words, all groups
> >> will have the same retention time.
> >> The KIP presumes that this global retention config would suffice common
> >> use cases and does not lead to, e.g., unmanageable offset cache size
> (for
> >> groups that don't need to stay around that long). It suggests opening
> >> another KIP if this global retention setting proves to be problematic
> in
> >> the future. It was suggested earlier in the discussion thread that the
> KIP
> >> should propose a per-group retention config to circumvent this risk.
> >>
> >> I look forward to hearing your thoughts. Thanks!
> >>
> >> --Vahid
> >>
> >>
> >>
> >>
> >> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> >> To:     dev <de...@kafka.apache.org>
> >> Date:   10/18/2017 04:45 PM
> >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer
> >> Group Offsets
> >>
> >>
> >>
> >> Hi all,
> >>
> >> I created a KIP to address the group offset expiration issue reported
> in
> >> KAFKA-4682:
> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> >>
> >>
> >> Your feedback is welcome!
> >>
> >> Thanks.
> >> --Vahid
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> > --
> >
> > *Jeff Widman*
> > jeffwidman.com <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> 3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> 9xkoI&e=
> > | 740-WIDMAN-J (943-6265)
> > <><
>
>
>
>
>
>

Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Vahid S Hashemian <va...@us.ibm.com>.
Thanks James for sharing that scenario.

I agree it makes sense to be able to remove offsets for the topics that 
are no longer "active" in the group.
I think it becomes important to determine what constitutes that a topic is 
no longer active: If we use per-partition expiration we would manually 
choose a retention time that works for the particular scenario.

That works, but since we are manually intervening and specify a 
per-partition retention, why not do the intervention in some other way:

One alternative for this intervention, to favor the simplicity of the 
suggested protocol in the KIP, is to improve upon the just introduced 
DELETE_GROUPS API and allow for deletion of offsets of specific topics in 
the group. This is what the old ZooKeeper based group management supported 
anyway, and we would just be leveling the group deletion features of the 
Kafka-based group management with the ZooKeeper-based one.

So, instead of deciding in advance when the offsets should be removed we 
would instantly remove them when we are sure that they are no longer 
needed.

Let me know what you think.

Thanks.
--Vahid



From:   James Cheng <wu...@gmail.com>
To:     dev@kafka.apache.org
Date:   02/01/2018 12:37 AM
Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer Group Offsets



Vahid,

Under rejected alternatives, we had decided that we did NOT want to do 
per-partition expiration, and instead we wait until the entire group is 
empty and then (after the right time has passed) expire the entire group 
at once.

I thought of one scenario that might benefit from per-partition 
expiration.

Let's say I have topics A B C... Z. So, I have 26 topics, all of them 
single partition, so 26 partitions. Let's say I have mirrormaker mirroring 
those 26 topics. The group will then have 26 committed offsets.

Let's say I then change the whitelist on mirrormaker so that it only 
mirrors topic Z, but I keep the same consumer group name. (I imagine that 
is a common thing to do?)

With the proposed design for this KIP, the committed offsets for topics A 
through Y will stay around as long as this mirroring group name exists.

In the current implementation that already exists (prior to this KIP), I 
belive that committed offsets for topics A through Y will expire.

How much do we care about this case?

-James

> On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> 
> Bumping this as I'd like to see it land...
> 
> It's one of the "features" that tends to catch Kafka n00bs unawares and
> typically results in message skippage/loss, vs the proposed solution is
> much more intuitive behavior.
> 
> Plus it's more wire efficient because consumers no longer need to commit
> offsets for partitions that have no new messages just to keep those 
offsets
> alive.
> 
> On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
> 
>> There has been no further discussion on this KIP for about two months.
>> So I thought I'd provide the scoop hoping it would spark additional
>> feedback and move the KIP forward.
>> 
>> The KIP proposes a method to preserve group offsets as long as the 
group
>> is not in Empty state (even when offsets are committed very rarely), 
and
>> start the offset expiration of the group as soon as the group becomes
>> Empty.
>> It suggests dropping the `retention_time` field from the `OffsetCommit`
>> request and, instead, enforcing it via the broker config
>> `offsets.retention.minutes` for all groups. In other words, all groups
>> will have the same retention time.
>> The KIP presumes that this global retention config would suffice common
>> use cases and does not lead to, e.g., unmanageable offset cache size 
(for
>> groups that don't need to stay around that long). It suggests opening
>> another KIP if this global retention setting proves to be problematic 
in
>> the future. It was suggested earlier in the discussion thread that the 
KIP
>> should propose a per-group retention config to circumvent this risk.
>> 
>> I look forward to hearing your thoughts. Thanks!
>> 
>> --Vahid
>> 
>> 
>> 
>> 
>> From:   "Vahid S Hashemian" <va...@us.ibm.com>
>> To:     dev <de...@kafka.apache.org>
>> Date:   10/18/2017 04:45 PM
>> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of 
Consumer
>> Group Offsets
>> 
>> 
>> 
>> Hi all,
>> 
>> I created a KIP to address the group offset expiration issue reported 
in
>> KAFKA-4682:
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
>> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
>> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
>> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
>> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
>> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
>> 
>> 
>> Your feedback is welcome!
>> 
>> Thanks.
>> --Vahid
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> -- 
> 
> *Jeff Widman*
> jeffwidman.com <
https://urldefense.proofpoint.com/v2/url?u=http-3A__www.jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n9xkoI&e=
> | 740-WIDMAN-J (943-6265)
> <><






Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by James Cheng <wu...@gmail.com>.
Vahid,

Under rejected alternatives, we had decided that we did NOT want to do per-partition expiration, and instead we wait until the entire group is empty and then (after the right time has passed) expire the entire group at once.

I thought of one scenario that might benefit from per-partition expiration.

Let's say I have topics A B C... Z. So, I have 26 topics, all of them single partition, so 26 partitions. Let's say I have mirrormaker mirroring those 26 topics. The group will then have 26 committed offsets.

Let's say I then change the whitelist on mirrormaker so that it only mirrors topic Z, but I keep the same consumer group name. (I imagine that is a common thing to do?)

With the proposed design for this KIP, the committed offsets for topics A through Y will stay around as long as this mirroring group name exists.

In the current implementation that already exists (prior to this KIP), I belive that committed offsets for topics A through Y will expire.

How much do we care about this case?

-James

> On Jan 23, 2018, at 11:44 PM, Jeff Widman <je...@jeffwidman.com> wrote:
> 
> Bumping this as I'd like to see it land...
> 
> It's one of the "features" that tends to catch Kafka n00bs unawares and
> typically results in message skippage/loss, vs the proposed solution is
> much more intuitive behavior.
> 
> Plus it's more wire efficient because consumers no longer need to commit
> offsets for partitions that have no new messages just to keep those offsets
> alive.
> 
> On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> vahidhashemian@us.ibm.com> wrote:
> 
>> There has been no further discussion on this KIP for about two months.
>> So I thought I'd provide the scoop hoping it would spark additional
>> feedback and move the KIP forward.
>> 
>> The KIP proposes a method to preserve group offsets as long as the group
>> is not in Empty state (even when offsets are committed very rarely), and
>> start the offset expiration of the group as soon as the group becomes
>> Empty.
>> It suggests dropping the `retention_time` field from the `OffsetCommit`
>> request and, instead, enforcing it via the broker config
>> `offsets.retention.minutes` for all groups. In other words, all groups
>> will have the same retention time.
>> The KIP presumes that this global retention config would suffice common
>> use cases and does not lead to, e.g., unmanageable offset cache size (for
>> groups that don't need to stay around that long). It suggests opening
>> another KIP if this global retention setting proves to be problematic in
>> the future. It was suggested earlier in the discussion thread that the KIP
>> should propose a per-group retention config to circumvent this risk.
>> 
>> I look forward to hearing your thoughts. Thanks!
>> 
>> --Vahid
>> 
>> 
>> 
>> 
>> From:   "Vahid S Hashemian" <va...@us.ibm.com>
>> To:     dev <de...@kafka.apache.org>
>> Date:   10/18/2017 04:45 PM
>> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer
>> Group Offsets
>> 
>> 
>> 
>> Hi all,
>> 
>> I created a KIP to address the group offset expiration issue reported in
>> KAFKA-4682:
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
>> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
>> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
>> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
>> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
>> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
>> 
>> 
>> Your feedback is welcome!
>> 
>> Thanks.
>> --Vahid
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> -- 
> 
> *Jeff Widman*
> jeffwidman.com <http://www.jeffwidman.com/> | 740-WIDMAN-J (943-6265)
> <><


Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Posted by Jeff Widman <je...@jeffwidman.com>.
Bumping this as I'd like to see it land...

It's one of the "features" that tends to catch Kafka n00bs unawares and
typically results in message skippage/loss, vs the proposed solution is
much more intuitive behavior.

Plus it's more wire efficient because consumers no longer need to commit
offsets for partitions that have no new messages just to keep those offsets
alive.

On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
vahidhashemian@us.ibm.com> wrote:

> There has been no further discussion on this KIP for about two months.
> So I thought I'd provide the scoop hoping it would spark additional
> feedback and move the KIP forward.
>
> The KIP proposes a method to preserve group offsets as long as the group
> is not in Empty state (even when offsets are committed very rarely), and
> start the offset expiration of the group as soon as the group becomes
> Empty.
> It suggests dropping the `retention_time` field from the `OffsetCommit`
> request and, instead, enforcing it via the broker config
> `offsets.retention.minutes` for all groups. In other words, all groups
> will have the same retention time.
> The KIP presumes that this global retention config would suffice common
> use cases and does not lead to, e.g., unmanageable offset cache size (for
> groups that don't need to stay around that long). It suggests opening
> another KIP if this global retention setting proves to be problematic in
> the future. It was suggested earlier in the discussion thread that the KIP
> should propose a per-group retention config to circumvent this risk.
>
> I look forward to hearing your thoughts. Thanks!
>
> --Vahid
>
>
>
>
> From:   "Vahid S Hashemian" <va...@us.ibm.com>
> To:     dev <de...@kafka.apache.org>
> Date:   10/18/2017 04:45 PM
> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer
> Group Offsets
>
>
>
> Hi all,
>
> I created a KIP to address the group offset expiration issue reported in
> KAFKA-4682:
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
>
>
> Your feedback is welcome!
>
> Thanks.
> --Vahid
>
>
>
>
>
>


-- 

*Jeff Widman*
jeffwidman.com <http://www.jeffwidman.com/> | 740-WIDMAN-J (943-6265)
<><