You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by 东方甲乙 <25...@qq.com> on 2016/10/30 09:24:35 UTC

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

Hi All, 


As per our discussion, there are two ways to clean the consumed log:


1) Use an Admin Tool to find the min commit offset for some topics of the specified set of consumer groups, then send the trim API to all the replicas of the brokers,
then the brokers will start to trim the log segments of these topics.


The benefit of this method is to keep the broker simple and more flexible for the users, but it is more complicated for the users to clean all the messages which are consumed.


2) Broker will periodically do the consumed log retention as the KIP mentioned. This method is simple for the users and it can automatically clean the consumed log, but it will add more query work to the brokers.


Which method is better?


Thanks,
David








------------------ 原始邮件 ------------------
发件人: "Mayuresh Gharat";<gh...@gmail.com>;
发送时间: 2016年10月29日(星期六) 凌晨1:43
收件人: "dev"<de...@kafka.apache.org>; 

主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



I do agree with Guozhang on having applications request an external
service(admin) that talks to kafka, for trimming, in which case this
external service(admin) can check if its OK to send the trim request to
kafka brokers based on a certain conditions.
On broker side we can have authorization by way of ACLs may be, saying that
only this external admin service is allowed to call trim(). In this way we
can actually move the main decision making process out of core.

Thanks,

Mayuresh

On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Yes trim() should be an admin API and, if security is concerned, it should
> be under admin authorization as well.
>
> For applications that needs this feature, it then boils down to the problem
> that they should request the authorization token from who operates Kafka
> before starting their app to use in their own client, which I think is a
> feasible requirement.
>
>
> Guozhang
>
>
> On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com
> > wrote:
>
> > Hi Guozhang,
> >
> > I agree that pushing out the complexity of coordination to the client
> > application makes it more simple for the broker in the sense that it does
> > not have to be the decision maker regarding when to trim and till what
> > offset. An I agree that if we go in this direction, providing an offset
> > parameter makes sense.
> >
> >
> > But since the main motivation for this seems like saving or reclaiming
> the
> > disk space on broker side, I am not 100% sure how good it is to rely on
> the
> > client application to be a good citizen and call the trim API.
> > Also I see the trim() api as more of an admin api rather than client API.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Here are my thoughts:
> > >
> > > If there are indeed multiple consumer groups on the same topic that
> needs
> > > to coordinate, it is equally complex if the coordination is on the
> broker
> > > or among the applications themselves: for the latter case, you would
> > > imagine some coordination services used (like ZK) to register groups
> for
> > > that topic and let these groups agree upon the minimum offset that is
> > safe
> > > to trim for all of them; for the former case, we just need to move this
> > > coordination service into the broker side, which to me is not a good
> > design
> > > under the principle of making broker simple.
> > >
> > > And as we discussed, there are scenarios where the offset to trim is
> not
> > > necessarily dependent on the committed offsets, even if the topic is
> only
> > > consumed by a single consumer group and we do not need any
> coordination.
> > So
> > > I think it is appropriate to require an "offset parameter" in the trim
> > API.
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin <be...@gmail.com>
> > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I think the trim() interface is generally useful. What I was
> wondering
> > is
> > > > the following:
> > > > if the user has multiple applications to coordinate, it seems simpler
> > for
> > > > the broker to coordinate instead of asking the applications to
> > coordinate
> > > > among themselves. If we let the broker do the coordination and do not
> > > want
> > > > to reuse committed offset for trim(), we kind of need something like
> > > > "offset for trim", which do not seems to be general enough to have.
> But
> > > if
> > > > there is a single application then we don't need to worry about the
> > > > coordination hence this is no longer a problem.
> > > >
> > > > The use cases for multiple consumer groups I am thinking of is some
> > kind
> > > of
> > > > fork in the DAG, i.e. one intermediate result stream used by multiple
> > > > downstream jobs. But that may not be a big deal if the processing is
> > > within
> > > > the same application.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Becket,
> > > > >
> > > > > I am not 100 percent sure I get your points, reading the first half
> > of
> > > > the
> > > > > paragraph I thought we were on the same page that "the committed
> > > offsets
> > > > > and the offsets the applications ( most likely the consumers) would
> > > like
> > > > to
> > > > > tell the brokers to trim to, could be totally different", but then
> > you
> > > > said
> > > > > "not sure if the requirement ... is general enough", which confused
> > me
> > > a
> > > > > bit :) Anyways, I think the consumer committed offsets should be
> > > > separated
> > > > > from whatever the proposed APIs for telling the brokers to safely
> > trim
> > > > > their logs since they will not be read any more. And Jun also made
> a
> > > good
> > > > > point about that regarding the replay scenarios, which also applies
> > for
> > > > > users who do not require the flexibility as you mentioned.
> > > > >
> > > > > Regarding the coordination complexity among applications
> themselves,
> > my
> > > > gut
> > > > > feeling is that, in practice, this feature would be mostly used
> when
> > > the
> > > > > topic is solely consumed by only one group, and for cases where the
> > > topic
> > > > > is gonna be consumed by multiple groups, this feature would less
> > likely
> > > > be
> > > > > applicable. And if there are indeed such cases, coordination cannot
> > be
> > > > > avoidable since otherwise how can a consumer group (hence a dev
> team
> > /
> > > > > project / etc) tell if the other group is OK with trimming the
> data?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin <be...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > The trim() interface would be useful in general. And I agree with
> > > > > Guozhang
> > > > > > that conceptually letting the application to decide when to
> delete
> > > the
> > > > > > messages is more intuitive and flexible.
> > > > > >
> > > > > > That said, I am not sure if putting coordination on the
> application
> > > > side
> > > > > is
> > > > > > the best option. At a high level, there are two things to be
> done:
> > > > > > 1. Coordinate among all the interested consumer groups.
> > > > > > 2. Telling the brokers to trim the log
> > > > > >
> > > > > > For (1), letting different applications coordinate among
> themselves
> > > is
> > > > > more
> > > > > > involved, and this logic may have to be implemented by different
> > > > > > applications. As Guozhang mentioned, the most intuitive way may
> be
> > > > > looking
> > > > > > at the committed offset for each of the groups. But the
> > applications
> > > > may
> > > > > > still need to coordinate among themselves to avoid split brains
> > > issues.
> > > > > If
> > > > > > there are many consumers from different applications, the brokers
> > may
> > > > > > potentially see a lot of offset queries. So, while letting the
> > > consumer
> > > > > > groups coordinate among themselves provides flexibility, it
> doesn't
> > > > look
> > > > > > simpler overall. There seems a trade off between easiness of use
> > and
> > > > > > flexibility. For people who require flexibility, consumer side
> > > > > coordination
> > > > > > + trim() interface is the way to go. But for people who don't
> > require
> > > > > that,
> > > > > > committed offset based retention seems simpler and does not need
> > any
> > > > > client
> > > > > > side code change.
> > > > > >
> > > > > > For (2), in the current approach, the consumers tell the broker
> > their
> > > > > > positions by committing offsets. If we use trim(), it would be
> more
> > > > > > explicit. I am actually a little concerned about reusing the
> > > committed
> > > > > > offset for log retention. It essentially overloads the offset
> > commit
> > > > with
> > > > > > both checkpointing and consume-based log retention, which may not
> > > work
> > > > > when
> > > > > > people want to separate those two functions. People can use app
> > side
> > > > > > coordination + trim() to workaround this issue. But I am not sure
> > if
> > > > that
> > > > > > the requirement of separating offset commit from consume-based
> log
> > > > > > retention is general enough to be addressed specifically.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 25, 2016 at 3:00 PM, Joel Koshy <jjkoshy.w@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > +1 - I was thinking the exact same thing.
> > > > > > >
> > > > > > > On Tue, Oct 25, 2016 at 2:52 PM, Jun Rao <ju...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > > > One of the main reasons for retaining messages on the broker
> > > after
> > > > > > > > consumption is to support replay. A common reason for replay
> is
> > > to
> > > > > fix
> > > > > > > and
> > > > > > > > application error. So, it seems that it's a bit hard to
> delete
> > > log
> > > > > > > segments
> > > > > > > > just based on the committed offsets that the broker knows. An
> > > > > > alternative
> > > > > > > > approach is to support an api that can trim the log up to a
> > > > specified
> > > > > > > > offset (similar to what's being discussed in KIP-47). This
> way,
> > > an
> > > > > > > > application can control when and how much to trim the log.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Mon, Oct 24, 2016 at 11:11 AM, Guozhang Wang <
> > > > wangguoz@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Overall I think the motivation is common and of interests
> to
> > > lots
> > > > > of
> > > > > > > > users.
> > > > > > > > > Would like to throw my two cents on this discussion:
> > > > > > > > >
> > > > > > > > > 1. Kafka topics can be used in different ways. For some
> > > > categories
> > > > > of
> > > > > > > > > topics (think: "pageView" event topics), it is a shared
> topic
> > > > among
> > > > > > > > > different teams / apps within the organization and lots of
> > > > > temporary
> > > > > > > > > consumers (for debugging, trouble shooting, prototype
> > > > development,
> > > > > > etc)
> > > > > > > > can
> > > > > > > > > come and go dynamically, in which case it is hard to track
> > all
> > > of
> > > > > > such
> > > > > > > > > consumer and maintain the minimum committed offsets; on the
> > > other
> > > > > > hand,
> > > > > > > > > there are another category of topics (think: stream-app
> owned
> > > > > > > > intermediate
> > > > > > > > > topics like "pricing-enriched-bid-activity", as Becket
> > > mentioned
> > > > > > > above)
> > > > > > > > > which are particularly own but only one or a few apps, and
> > > hence
> > > > > the
> > > > > > > > > consumer groups for those topics are pre-defined and
> roughly
> > > > > static.
> > > > > > In
> > > > > > > > > this case I think it makes sense to allow such
> consumer-drive
> > > log
> > > > > > > > retention
> > > > > > > > > features.
> > > > > > > > >
> > > > > > > > > 2. In this case, my question is then whether this
> bookkeeping
> > > of
> > > > > > > > > min-committed-offsets should be done at the brokers side or
> > it
> > > > > should
> > > > > > > be
> > > > > > > > on
> > > > > > > > > the app side. My gut feeling is that it could be better
> > > bookkept
> > > > on
> > > > > > the
> > > > > > > > app
> > > > > > > > > (i.e. client) side which has the full information of the
> > > > > "registered
> > > > > > > > > consumer groups" for certain topics, and then knows the
> > > > > > > > > min-committed-offsets. And a slightly-modified KIP-47
> > mentioned
> > > > by
> > > > > > Dong
> > > > > > > > > could a better fit, where a) app side bookkeep the
> > > > consumer-driven
> > > > > > min
> > > > > > > > > offset based on their committed offsets, by either talking
> to
> > > the
> > > > > > > > consumer
> > > > > > > > > clients directly or query broker for the committed offsets
> of
> > > > those
> > > > > > > > > registered consumer groups, and then b) write
> > > > > > > > > *log.retention.min.offset* periodically
> > > > > > > > > to broker to let it delete old segments before that offset
> > > (NOTE
> > > > > that
> > > > > > > the
> > > > > > > > > semantics is exactly the same as to KIP-47, while the only
> > > > > difference
> > > > > > > is
> > > > > > > > > that we use offset instead of timestamp to indicate, which
> > can
> > > be
> > > > > > honor
> > > > > > > > by
> > > > > > > > > the same implementation of KIP-47 on broker side).
> > > > > > > > >
> > > > > > > > > My arguments for letting the app side to bookkeep such
> > > > min-offsets
> > > > > > and
> > > > > > > > only
> > > > > > > > > let brokers to take requests to delete segments accordingly
> > are
> > > > 1)
> > > > > > > > keeping
> > > > > > > > > the broker simple without any querying each other about
> such
> > > > > offsets
> > > > > > > and
> > > > > > > > > does the min() calculation, rather only keeping / deleting
> > > > messages
> > > > > > > from
> > > > > > > > > client admin requests, and 2) allowing more generalized
> > > > > client-driven
> > > > > > > log
> > > > > > > > > retention policies with KIP-47 (i.e. broker is brainless
> and
> > > only
> > > > > > take
> > > > > > > > > requests while client-app can apply any customized logic to
> > > > > determine
> > > > > > > the
> > > > > > > > > config values of *og.retention.min.offset or
> > > > > > > > **og.retention.min.timestamp*
> > > > > > > > > that
> > > > > > > > > they send to the brokers).
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> > > > becket.qin@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi David,
> > > > > > > > > >
> > > > > > > > > > > 1. What scenario is used to this configuration?
> > > > > > > > > >
> > > > > > > > > > One scenario is stream processing pipeline. In a stream
> > > > > processing
> > > > > > > DAG,
> > > > > > > > > > there will be a bunch of intermediate result, we only
> care
> > > > about
> > > > > > the
> > > > > > > > > > consumer group that is in the downstream of the DAG, but
> > not
> > > > > other
> > > > > > > > > groups.
> > > > > > > > > > Ideally we want to delete the log of the intermediate
> > topics
> > > > > right
> > > > > > > > after
> > > > > > > > > > all the downstream processing jobs has successfully
> > processed
> > > > the
> > > > > > > > > messages.
> > > > > > > > > > In that case, we only care about the downstream
> processing
> > > > jobs,
> > > > > > but
> > > > > > > > not
> > > > > > > > > > other groups. That means if a down stream job did not
> > commit
> > > > > offset
> > > > > > > for
> > > > > > > > > > some reason, we want to wait for that job. Without the
> > > > predefined
> > > > > > > > > > interested group, it is hard to achieve this.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 2. Yes, the configuration should be at topic level and
> set
> > > > > > > dynamically.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <25...@qq.com>
> > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Mayuresh,
> > > > > > > > > > >     Thanks for the reply:
> > > > > > > > > > > 1.  In the log retention check schedule, the broker
> first
> > > > find
> > > > > > the
> > > > > > > > all
> > > > > > > > > > the
> > > > > > > > > > > consumed group which are consuming this topic, and
> query
> > > the
> > > > > > commit
> > > > > > > > > > offset
> > > > > > > > > > > of this consumed group for the topic
> > > > > > > > > > > using the OffsetFetch API. And the min commit offset is
> > the
> > > > > > minimal
> > > > > > > > > > commit
> > > > > > > > > > > offset between these commit offsets.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 2.  If the console consumer reading and commit, its
> > commit
> > > > > offset
> > > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > used to calculate the min commit offset for this topic.
> > > > > > > > > > > We can avoid the random consumer using the method
> Becket
> > > > > > suggested.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 3. It will not delete the log immediately, the log will
> > > stay
> > > > > some
> > > > > > > > time
> > > > > > > > > (
> > > > > > > > > > > retention.commitoffset.ms), and after that we only
> > delete
> > > > > > > > > > > the log segments whose offsets are less than the min
> > commit
> > > > > > offset.
> > > > > > > > So
> > > > > > > > > > > the user can rewind its offset in the log.retention.ms
> .
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > David
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > ------------------ 原始邮件 ------------------
> > > > > > > > > > > 发件人: "Mayuresh Gharat";<gh...@gmail.com>;
> > > > > > > > > > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > > > > > > > > > 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > >
> > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention
> > > before
> > > > > log
> > > > > > > > > > retention
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Hi David,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > >
> > > > > > > > > > > I had some questions/suggestions :
> > > > > > > > > > >
> > > > > > > > > > > It would be great if you can explain with an example
> > about
> > > > how
> > > > > > the
> > > > > > > > min
> > > > > > > > > > > offset for all the consumers will be calculated, in the
> > > KIP.
> > > > > > > > > > > What I meant was, it would be great to understand with
> a
> > > > pseudo
> > > > > > > > > > > code/workflow if possible, how each broker knows all
> the
> > > > > > consumers
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > > given topic-partition and how the min is calculated.
> > > > > > > > > > >
> > > > > > > > > > > Also it would be good to understand what happens if we
> > > start
> > > > a
> > > > > > > > console
> > > > > > > > > > > consumer which would actually start reading from the
> > > > beginning
> > > > > > > offset
> > > > > > > > > and
> > > > > > > > > > > commit and crash immediately. How will the segments get
> > > > > deleted?
> > > > > > > > > > >
> > > > > > > > > > > Will it delete all the log segments if all the
> consumers
> > > have
> > > > > > read
> > > > > > > > till
> > > > > > > > > > > latest? If Yes, would we be able to handle a scenario
> > were
> > > we
> > > > > say
> > > > > > > > that
> > > > > > > > > > user
> > > > > > > > > > > can rewind its offset to reprocess the data since
> > > > > > log.retention.ms
> > > > > > > > > might
> > > > > > > > > > > not has reached.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Mayuresh
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <
> > > > > > becket.qin@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey David,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for replies to the questions.
> > > > > > > > > > > >
> > > > > > > > > > > > I think one major thing still not clear at this point
> > is
> > > > that
> > > > > > > > whether
> > > > > > > > > > the
> > > > > > > > > > > > brokers will only apply the consumed log retention
> to a
> > > > > > specific
> > > > > > > > set
> > > > > > > > > of
> > > > > > > > > > > > interested consumer groups, or it does not have such
> a
> > > set
> > > > of
> > > > > > > > > consumer
> > > > > > > > > > > > groups.
> > > > > > > > > > > >
> > > > > > > > > > > > For example, for topic T, assume we know that there
> > will
> > > be
> > > > > two
> > > > > > > > > > > downstream
> > > > > > > > > > > > consumer groups CG1 and CG2 consuming data from topic
> > T.
> > > > Will
> > > > > > we
> > > > > > > > add
> > > > > > > > > a
> > > > > > > > > > > > topic configurations such as
> > > > > > > > > > > > "log.retention.commitoffset.
> interested.groups=CG1,CG2"
> > > to
> > > > > > topic
> > > > > > > T
> > > > > > > > so
> > > > > > > > > > > that
> > > > > > > > > > > > the brokers only care about CG1 and CG2. The
> committed
> > > > > offsets
> > > > > > of
> > > > > > > > > other
> > > > > > > > > > > > groups are not interested and won't have any impact
> on
> > > the
> > > > > > > > committed
> > > > > > > > > > > offset
> > > > > > > > > > > > based log retention.
> > > > > > > > > > > >
> > > > > > > > > > > > It seems the current proposal does not have an
> > > "interested
> > > > > > > consumer
> > > > > > > > > > group
> > > > > > > > > > > > set" configuration, so that means any random consumer
> > > group
> > > > > may
> > > > > > > > > affect
> > > > > > > > > > > the
> > > > > > > > > > > > committed offset based log retention.
> > > > > > > > > > > >
> > > > > > > > > > > > I think the committed offset based log retention
> seems
> > > more
> > > > > > > useful
> > > > > > > > in
> > > > > > > > > > > cases
> > > > > > > > > > > > where we already know which consumer groups will be
> > > > consuming
> > > > > > > from
> > > > > > > > > this
> > > > > > > > > > > > topic, so we will only wait for those consumer groups
> > but
> > > > > > ignore
> > > > > > > > the
> > > > > > > > > > > > others. If a group will be consumed by many unknown
> or
> > > > > > > > unpredictable
> > > > > > > > > > > > consumer groups, it seems the existing time based log
> > > > > retention
> > > > > > > is
> > > > > > > > > much
> > > > > > > > > > > > simple and clear enough. So I would argue we don't
> need
> > > to
> > > > > > > address
> > > > > > > > > the
> > > > > > > > > > > case
> > > > > > > > > > > > that some groups may come later in the committed
> offset
> > > > based
> > > > > > > > > > retention.
> > > > > > > > > > > >
> > > > > > > > > > > > That said, there may still be value to keep the data
> > for
> > > > some
> > > > > > > time
> > > > > > > > > even
> > > > > > > > > > > > after all the interested consumer groups have
> consumed
> > > the
> > > > > > > > messages.
> > > > > > > > > > For
> > > > > > > > > > > > example, in a pipelined stream processing DAG, we may
> > > want
> > > > to
> > > > > > > keep
> > > > > > > > > the
> > > > > > > > > > > data
> > > > > > > > > > > > of an intermediate topic for some time in case the
> job
> > > > fails.
> > > > > > So
> > > > > > > we
> > > > > > > > > can
> > > > > > > > > > > > resume from a previously succeeded stage instead of
> > > restart
> > > > > the
> > > > > > > > > entire
> > > > > > > > > > > > pipeline. Or we can use the intermediate topic for
> some
> > > > > > debugging
> > > > > > > > > work.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <
> > 254479818@qq.com>
> > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > >     The KIP is used to solve both these 2 cases, we
> > > > > specify a
> > > > > > > > small
> > > > > > > > > > > > > consumed log retention time to deleted the consumed
> > > data
> > > > > and
> > > > > > > > avoid
> > > > > > > > > > > losing
> > > > > > > > > > > > > un-consumed data.
> > > > > > > > > > > > > And the specify a large force log retention time
> used
> > > as
> > > > > > higher
> > > > > > > > > bound
> > > > > > > > > > > for
> > > > > > > > > > > > > the data.  I will update the KIP for this info.
> > > > > > > > > > > > >     Another solution I think may be ok is to
> support
> > an
> > > > API
> > > > > > to
> > > > > > > > > delete
> > > > > > > > > > > the
> > > > > > > > > > > > > inactive group?  If the group is in inactive, but
> > it's
> > > > > commit
> > > > > > > > > offset
> > > > > > > > > > is
> > > > > > > > > > > > > also in the __commit_offsets topic and
> > > > > > > > > > > > > stay in the offset cache,  we can delete it via
> this
> > > API.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > David
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > ------------------ 原始邮件 ------------------
> > > > > > > > > > > > > 发件人: "Dong Lin";<li...@gmail.com>;
> > > > > > > > > > > > > 发送时间: 2016年10月14日(星期五) 凌晨5:01
> > > > > > > > > > > > > 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > > > >
> > > > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log
> retention
> > > > > before
> > > > > > > log
> > > > > > > > > > > > retention
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > >
> > > > > > > > > > > > > As explained in the motivation section of the KIP,
> > the
> > > > > > problem
> > > > > > > is
> > > > > > > > > > that
> > > > > > > > > > > if
> > > > > > > > > > > > > log retention is too small, we may lose data; and
> if
> > > log
> > > > > > > > retention
> > > > > > > > > is
> > > > > > > > > > > too
> > > > > > > > > > > > > large, then we waste disk space. Therefore, we need
> > to
> > > > > solve
> > > > > > > one
> > > > > > > > if
> > > > > > > > > > the
> > > > > > > > > > > > two
> > > > > > > > > > > > > problems -- allow data to be persisted longer for
> > > > > consumption
> > > > > > > if
> > > > > > > > > log
> > > > > > > > > > > > > retention is set too small, or allow data to be
> > expired
> > > > > > earlier
> > > > > > > > if
> > > > > > > > > > log
> > > > > > > > > > > > > retention is too large. I think the KIP probably
> > needs
> > > to
> > > > > > make
> > > > > > > > this
> > > > > > > > > > > clear
> > > > > > > > > > > > > and explain which one is rejected and why. Note
> that
> > > the
> > > > > > choice
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > two
> > > > > > > > > > > > > affects the solution -- if we want to address the
> > first
> > > > > > problem
> > > > > > > > > then
> > > > > > > > > > > > > log.retention.ms should be used as lower bound on
> > the
> > > > > actual
> > > > > > > > > > retention
> > > > > > > > > > > > > time, and if we want to address the second problem
> > then
> > > > the
> > > > > > > > > > > > > log.retention.ms
> > > > > > > > > > > > > should be used as higher bound on the actual
> > retention
> > > > > time.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In both cases, we probably need to figure out a way
> > to
> > > > > > > determine
> > > > > > > > > > > "active
> > > > > > > > > > > > > consumer group". Maybe we can compare the
> > > > > > > time-since-last-commit
> > > > > > > > > > > against
> > > > > > > > > > > > a
> > > > > > > > > > > > > threshold to determine this. In addition, the
> > threshold
> > > > can
> > > > > > be
> > > > > > > > > > > overridden
> > > > > > > > > > > > > either per-topic or per-groupId. If we go along
> this
> > > > route,
> > > > > > the
> > > > > > > > > > > rejected
> > > > > > > > > > > > > solution (per-topic vs. per-groupId) should
> probably
> > be
> > > > > > > explained
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Dong
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <
> > > > > > > lindong28@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your explanation. There still seems to
> > be
> > > > > issue
> > > > > > > with
> > > > > > > > > > this
> > > > > > > > > > > > > > solution. Please see my comment inline.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <
> > > > 254479818@qq.com>
> > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hi Dong,
> > > > > > > > > > > > > >>     Sorry for the delay, here are the comments:
> > > > > > > > > > > > > >> 1.I think we should distinguish these two cases:
> > > > > > > > > > > > > >> (1) group has no member, but has commit offset :
> > In
> > > > > this
> > > > > > > case
> > > > > > > > > we
> > > > > > > > > > > > should
> > > > > > > > > > > > > >> consider its commit offset
> > > > > > > > > > > > > >> (2) group has no member, no commit offset:  Skip
> > > this
> > > > > > group
> > > > > > > > > > > > > >> Is it ok?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> ListGroup API can list the groups,  but this API
> > > only
> > > > > show
> > > > > > > the
> > > > > > > > > > > Online
> > > > > > > > > > > > > >> Group, so we should enhance the listGroup API to
> > > list
> > > > > > those
> > > > > > > > > groups
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> case (1)
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Say some user starts a consumer to consume
> topic A
> > > > with
> > > > > > > > > > > > > > enable.auto.commit = true. Later they change the
> > > group
> > > > > name
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > > > config.
> > > > > > > > > > > > > > Then the proposed solution will never execute
> > > consumed
> > > > > log
> > > > > > > > > > retention
> > > > > > > > > > > > for
> > > > > > > > > > > > > > the topic A, right? I think group name change is
> > > pretty
> > > > > > > common
> > > > > > > > > and
> > > > > > > > > > we
> > > > > > > > > > > > > > should take care of this issue. One possible
> > solution
> > > > is
> > > > > to
> > > > > > > > add a
> > > > > > > > > > > > config
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > specify the maximum time since last offset commit
> > > > before
> > > > > we
> > > > > > > > > > consider
> > > > > > > > > > > a
> > > > > > > > > > > > > > group is inactive.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> 2. Because every consumer group may appear in
> > > > different
> > > > > > > time,
> > > > > > > > > say,
> > > > > > > > > > > > group
> > > > > > > > > > > > > >> 1 start to consume in day 1, group 2 start to
> > > consume
> > > > in
> > > > > > day
> > > > > > > > 2.
> > > > > > > > > > If
> > > > > > > > > > > we
> > > > > > > > > > > > > >> delete the log segment right away,
> > > > > > > > > > > > > >> group 2 can not consume these message.  So we
> hope
> > > the
> > > > > > > > messages
> > > > > > > > > > can
> > > > > > > > > > > > hold
> > > > > > > > > > > > > >> for a specified time.  I think many use-cases
> will
> > > > need
> > > > > > > there
> > > > > > > > > > > configs,
> > > > > > > > > > > > > if
> > > > > > > > > > > > > >> there are many consumer groups.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > > If we want to take care of group 2, can we simply
> > > > disable
> > > > > > > > > consumed
> > > > > > > > > > > log
> > > > > > > > > > > > > > retention for the topic and set log retention to
> 1
> > > day?
> > > > > Can
> > > > > > > you
> > > > > > > > > > > explain
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > benefit of enabling consumed log retention and
> set
> > > > > consumed
> > > > > > > log
> > > > > > > > > > > > retention
> > > > > > > > > > > > > > to 1 day?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Currently the flow graph in the KIP suggests that
> > we
> > > > > delete
> > > > > > > > data
> > > > > > > > > > iff
> > > > > > > > > > > > > > (consumed log retention is triggered OR forced
> log
> > > > > > retention
> > > > > > > is
> > > > > > > > > > > > > triggered).
> > > > > > > > > > > > > > And alternative solution is to delete data iff (
> > > > > (consumed
> > > > > > > log
> > > > > > > > > > > > retention
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > disabled OR consumed log retention is triggered)
> > AND
> > > > > forced
> > > > > > > log
> > > > > > > > > > > > retention
> > > > > > > > > > > > > > is triggered). I would argue that the 2nd scheme
> is
> > > > > better.
> > > > > > > Say
> > > > > > > > > the
> > > > > > > > > > > > > > consumed log retention is enabled. The 1st scheme
> > > > > basically
> > > > > > > > > > > interprets
> > > > > > > > > > > > > > forced log retention as the upper bound of the
> time
> > > the
> > > > > > data
> > > > > > > > can
> > > > > > > > > > stay
> > > > > > > > > > > > in
> > > > > > > > > > > > > > Kafka. The 2nd scheme interprets forced log
> > retention
> > > > as
> > > > > > the
> > > > > > > > > lower
> > > > > > > > > > > > bound
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > the time the data can stay in Kafka, which is
> more
> > > > > > consistent
> > > > > > > > > with
> > > > > > > > > > > the
> > > > > > > > > > > > > > purpose of having this forced log retention (to
> > save
> > > > disk
> > > > > > > > space).
> > > > > > > > > > And
> > > > > > > > > > > > if
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > adopt the 2nd solution, the use-case you
> suggested
> > > can
> > > > be
> > > > > > > > easily
> > > > > > > > > > > > > addressed
> > > > > > > > > > > > > > by setting forced log retention to 1 day and
> enable
> > > > > > consumed
> > > > > > > > log
> > > > > > > > > > > > > retention.
> > > > > > > > > > > > > > What do you think?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > >> David
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> ------------------ 原始邮件 ------------------
> > > > > > > > > > > > > >> 发件人: "Dong Lin";<li...@gmail.com>;
> > > > > > > > > > > > > >> 发送时间: 2016年10月10日(星期一) 下午4:05
> > > > > > > > > > > > > >> 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> 主题: Re: [DISCUSS] KIP-68 Add a consumed log
> > > retention
> > > > > > before
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Hey David,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks for reply. Please see comment inline.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) <
> > > > > > > > > > > pengwei.li@huawei.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hi Dong
> > > > > > > > > > > > > >> >    Thanks for the questions:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > 1.  Now we don't distinguish inactive or
> active
> > > > > groups.
> > > > > > > > > Because
> > > > > > > > > > in
> > > > > > > > > > > > > some
> > > > > > > > > > > > > >> > case maybe inactive group will become active
> > > again,
> > > > > and
> > > > > > > > using
> > > > > > > > > > the
> > > > > > > > > > > > > >> previous
> > > > > > > > > > > > > >> > commit offset.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > So we will not delete the log segment in the
> > > > consumer
> > > > > > > > > retention
> > > > > > > > > > if
> > > > > > > > > > > > > there
> > > > > > > > > > > > > >> > are some groups consume but not commit, but
> the
> > > log
> > > > > > > segment
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > > > >> delete by
> > > > > > > > > > > > > >> >      the force retention.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> So in the example I provided, the consumed log
> > > > retention
> > > > > > > will
> > > > > > > > be
> > > > > > > > > > > > > >> effectively disabled, right? This seems to be a
> > real
> > > > > > problem
> > > > > > > > in
> > > > > > > > > > > > > operation
> > > > > > > > > > > > > >> -- we don't want log retention to be
> > > un-intentionally
> > > > > > > disabled
> > > > > > > > > > > simply
> > > > > > > > > > > > > >> because someone start a tool to consume from
> that
> > > > topic.
> > > > > > > > Either
> > > > > > > > > > this
> > > > > > > > > > > > KIP
> > > > > > > > > > > > > >> should provide a way to handle this, or there
> > should
> > > > be
> > > > > a
> > > > > > > way
> > > > > > > > > for
> > > > > > > > > > > > > operator
> > > > > > > > > > > > > >> to be aware of such case and be able to
> re-eanble
> > > > > consumed
> > > > > > > log
> > > > > > > > > > > > retention
> > > > > > > > > > > > > >> for the topic. What do you think?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > 2.  These configs are used to determine the
> out
> > of
> > > > > date
> > > > > > > time
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > consumed retention, like the parameters of the
> > > force
> > > > > > > > retention
> > > > > > > > > > > > > >> > (log.retention.hours, log.retention.minutes,
> > > > > > > > log.retention.ms
> > > > > > > > > ).
> > > > > > > > > > > For
> > > > > > > > > > > > > >> > example, users want the save the log for 3
> days,
> > > > > after 3
> > > > > > > > days,
> > > > > > > > > > > kafka
> > > > > > > > > > > > > >> will
> > > > > > > > > > > > > >> > delete the log segments which are
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > consumed by all the consumer group.  The log
> > > > retention
> > > > > > > > thread
> > > > > > > > > > need
> > > > > > > > > > > > > these
> > > > > > > > > > > > > >> > parameters.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > It makes sense to have configs such as
> > > > > log.retention.ms
> > > > > > > --
> > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > > used
> > > > > > > > > > > > > >> to
> > > > > > > > > > > > > >> make data available for up to a configured
> amount
> > of
> > > > > time
> > > > > > > > before
> > > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > >> deleted. My question is what is the use-case for
> > > > making
> > > > > > log
> > > > > > > > > > > available
> > > > > > > > > > > > > for
> > > > > > > > > > > > > >> another e.g. 3 days after it has been consumed
> by
> > > all
> > > > > > > consumer
> > > > > > > > > > > groups.
> > > > > > > > > > > > > The
> > > > > > > > > > > > > >> purpose of this KIP is to allow log to be
> deleted
> > > > right
> > > > > as
> > > > > > > > long
> > > > > > > > > as
> > > > > > > > > > > all
> > > > > > > > > > > > > >> interested consumer groups have consumed it. Can
> > you
> > > > > > > provide a
> > > > > > > > > > > > use-case
> > > > > > > > > > > > > >> for
> > > > > > > > > > > > > >> keeping log available for longer time after it
> has
> > > > been
> > > > > > > > consumed
> > > > > > > > > > by
> > > > > > > > > > > > all
> > > > > > > > > > > > > >> groups?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > >> > David
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Hey David,
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Thanks for the KIP. Can you help with the
> > > > following
> > > > > > two
> > > > > > > > > > > questions:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > 1) If someone start a consumer (e.g.
> > > > > > > > kafka-console-consumer)
> > > > > > > > > > to
> > > > > > > > > > > > > >> consume a
> > > > > > > > > > > > > >> > > topic for debug/validation purpose, a
> randome
> > > > > consumer
> > > > > > > > group
> > > > > > > > > > may
> > > > > > > > > > > > be
> > > > > > > > > > > > > >> > created
> > > > > > > > > > > > > >> > > and offset may be committed for this
> consumer
> > > > group.
> > > > > > If
> > > > > > > no
> > > > > > > > > > > offset
> > > > > > > > > > > > > >> commit
> > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > >> > > made for this consumer group in the future,
> > will
> > > > > this
> > > > > > > > > > > effectively
> > > > > > > > > > > > > >> > > disable consumed log retention for this
> topic?
> > > In
> > > > > > other
> > > > > > > > > words,
> > > > > > > > > > > how
> > > > > > > > > > > > > do
> > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > >> > > KIP distinguish active consumer group from
> > > > inactive
> > > > > > > ones?
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > 2) Why do we need new configs such as
> > > > > > > > > > > > log.retention.commitoffset.hou
> > > > > > > > > > > > > >> rs?
> > > > > > > > > > > > > >> > Can
> > > > > > > > > > > > > >> > >we simply delete log segments if consumed log
> > > > > retention
> > > > > > > is
> > > > > > > > > > > enabled
> > > > > > > > > > > > > for
> > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > >> > > topic and all consumer groups have consumed
> > > > messages
> > > > > > in
> > > > > > > > the
> > > > > > > > > > log
> > > > > > > > > > > > > >> segment?
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > > >> > > Dong
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) <
> > > > > > > > > > > pengwei.li@huawei.com
> > > > > > > > > > > > >
> > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Hi Becket,
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >   Thanks for the feedback:
> > > > > > > > > > > > > >> > > > 1.  We use the simple consumer api to
> query
> > > the
> > > > > > commit
> > > > > > > > > > offset,
> > > > > > > > > > > > so
> > > > > > > > > > > > > we
> > > > > > > > > > > > > >> > don't
> > > > > > > > > > > > > >> > > > need to specify the consumer group.
> > > > > > > > > > > > > >> > > > 2.  Every broker using the simple consumer
> > > > > > > > > > api(OffsetFetchKey)
> > > > > > > > > > > > to
> > > > > > > > > > > > > >> query
> > > > > > > > > > > > > >> > > > the commit offset in the log retention
> > > process.
> > > > > The
> > > > > > > > > client
> > > > > > > > > > > can
> > > > > > > > > > > > > >> commit
> > > > > > > > > > > > > >> > > > offset or not.
> > > > > > > > > > > > > >> > > > 3.  It does not need to distinguish the
> > > follower
> > > > > > > brokers
> > > > > > > > > or
> > > > > > > > > > > > leader
> > > > > > > > > > > > > >> > > > brokers,  every brokers can query.
> > > > > > > > > > > > > >> > > > 4.  We don't need to change the protocols,
> > we
> > > > > mainly
> > > > > > > > > change
> > > > > > > > > > > the
> > > > > > > > > > > > > log
> > > > > > > > > > > > > >> > > > retention process in the log manager.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >   One question is the query min offset
> need
> > > > > > > > O(partitions *
> > > > > > > > > > > > groups)
> > > > > > > > > > > > > >> time
> > > > > > > > > > > > > >> > > > complexity, another alternative is to
> build
> > an
> > > > > > > internal
> > > > > > > > > > topic
> > > > > > > > > > > to
> > > > > > > > > > > > > >> save
> > > > > > > > > > > > > >> > every
> > > > > > > > > > > > > >> > > > partition's min offset, it can reduce to
> > O(1).
> > > > > > > > > > > > > >> > > > I will update the wiki for more details.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > >> > > > David
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > Hi Pengwei,
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Thanks for the KIP proposal. It is a
> very
> > > > useful
> > > > > > > KIP.
> > > > > > > > > At a
> > > > > > > > > > > > high
> > > > > > > > > > > > > >> > level,
> > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > >> > > > > proposed behavior looks reasonable to
> me.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > However, it seems that some of the
> details
> > > are
> > > > > not
> > > > > > > > > > mentioned
> > > > > > > > > > > > in
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> > KIP.
> > > > > > > > > > > > > >> > > > > For example,
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > 1. How will the expected consumer group
> be
> > > > > > > specified?
> > > > > > > > Is
> > > > > > > > > > it
> > > > > > > > > > > > > >> through
> > > > > > > > > > > > > >> > a per
> > > > > > > > > > > > > >> > > > > topic dynamic configuration?
> > > > > > > > > > > > > >> > > > > 2. How do the brokers detect the
> consumer
> > > > > offsets?
> > > > > > > Is
> > > > > > > > it
> > > > > > > > > > > > > required
> > > > > > > > > > > > > >> > for a
> > > > > > > > > > > > > >> > > > > consumer to commit offsets?
> > > > > > > > > > > > > >> > > > > 3. How do all the replicas know the
> about
> > > the
> > > > > > > > committed
> > > > > > > > > > > > offsets?
> > > > > > > > > > > > > >> > e.g. 1)
> > > > > > > > > > > > > >> > > > > non-coordinator brokers which do not
> have
> > > the
> > > > > > > > committed
> > > > > > > > > > > > offsets,
> > > > > > > > > > > > > >> 2)
> > > > > > > > > > > > > >> > > > > follower brokers which do not have
> > consumers
> > > > > > > directly
> > > > > > > > > > > > consuming
> > > > > > > > > > > > > >> from
> > > > > > > > > > > > > >> > it.
> > > > > > > > > > > > > >> > > > > 4. Is there any other changes need to be
> > > made
> > > > > > (e.g.
> > > > > > > > new
> > > > > > > > > > > > > >> protocols) in
> > > > > > > > > > > > > >> > > > > addition to the configuration change?
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > It would be great if you can update the
> > wiki
> > > > to
> > > > > > have
> > > > > > > > > more
> > > > > > > > > > > > > details.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei
> > (L)
> > > <
> > > > > > > > > > > > > >> pengwei.li@huawei.com>
> > > > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > Hi All,
> > > > > > > > > > > > > >> > > > > >    I have made a KIP to enhance the
> log
> > > > > > retention,
> > > > > > > > > > details
> > > > > > > > > > > > as
> > > > > > > > > > > > > >> > follows:
> > > > > > > > > > > > > >> > > > > > https://cwiki.apache.org/
> > > > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > > > > >> > > > > > 68+Add+a+consumed+log+
> > > > > > > > retention+before+log+retention
> > > > > > > > > > > > > >> > > > > >    Now start a discuss thread for this
> > > KIP ,
> > > > > > > looking
> > > > > > > > > > > forward
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> > > > > > feedback.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > >> > > > > > David
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -Regards,
> > > > > > > > > > > Mayuresh R. Gharat
> > > > > > > > > > > (862) 250-7125
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
> -- Guozhang
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

Posted by Becket Qin <be...@gmail.com>.
Hi David,

I think the trim() API is generally useful for the consume based retention
as well as other use cases. So we probably should have (1).

For (2), it is more of an optimization by doing a favor for the users. This
could be implemented on top of (1) if we want to. So maybe we can implement
(1) first and let the applications do the trim() by themselves at this
point. This will put more burden on the application side but is not that
bad if there is only one downstream consumer group. In the future if we
find more use cases where multiple down stream consumer groups need to
coordinate among themselves and a broker side help would make things
simpler, we can add (2) then.

Regarding the relation between this KIP and KIP-47. At a high level, they
are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would
be worth thinking about them together. After KIP-79, we can search messages
by timestamp, this essentially translates the timestamp to offsets. So
KIP-47 can also be built on top of the trim() by offsets interface after
translating the timestamp to offsets. Jun has suggested an implementation
in KIP-47 discussion thread which introduces a new TrimRequest. Would you
take a look and see if that could be used for KIP-68 as well?

Thanks,

Jiangjie (Becket) Qin



On Sun, Oct 30, 2016 at 2:24 AM, 东方甲乙 <25...@qq.com> wrote:

> Hi All,
>
>
> As per our discussion, there are two ways to clean the consumed log:
>
>
> 1) Use an Admin Tool to find the min commit offset for some topics of the
> specified set of consumer groups, then send the trim API to all the
> replicas of the brokers,
> then the brokers will start to trim the log segments of these topics.
>
>
> The benefit of this method is to keep the broker simple and more flexible
> for the users, but it is more complicated for the users to clean all the
> messages which are consumed.
>
>
> 2) Broker will periodically do the consumed log retention as the KIP
> mentioned. This method is simple for the users and it can automatically
> clean the consumed log, but it will add more query work to the brokers.
>
>
> Which method is better?
>
>
> Thanks,
> David
>
>
>
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Mayuresh Gharat";<gh...@gmail.com>;
> 发送时间: 2016年10月29日(星期六) 凌晨1:43
> 收件人: "dev"<de...@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> I do agree with Guozhang on having applications request an external
> service(admin) that talks to kafka, for trimming, in which case this
> external service(admin) can check if its OK to send the trim request to
> kafka brokers based on a certain conditions.
> On broker side we can have authorization by way of ACLs may be, saying that
> only this external admin service is allowed to call trim(). In this way we
> can actually move the main decision making process out of core.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Yes trim() should be an admin API and, if security is concerned, it
> should
> > be under admin authorization as well.
> >
> > For applications that needs this feature, it then boils down to the
> problem
> > that they should request the authorization token from who operates Kafka
> > before starting their app to use in their own client, which I think is a
> > feasible requirement.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat <
> > gharatmayuresh15@gmail.com
> > > wrote:
> >
> > > Hi Guozhang,
> > >
> > > I agree that pushing out the complexity of coordination to the client
> > > application makes it more simple for the broker in the sense that it
> does
> > > not have to be the decision maker regarding when to trim and till what
> > > offset. An I agree that if we go in this direction, providing an offset
> > > parameter makes sense.
> > >
> > >
> > > But since the main motivation for this seems like saving or reclaiming
> > the
> > > disk space on broker side, I am not 100% sure how good it is to rely on
> > the
> > > client application to be a good citizen and call the trim API.
> > > Also I see the trim() api as more of an admin api rather than client
> API.
> > >
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Here are my thoughts:
> > > >
> > > > If there are indeed multiple consumer groups on the same topic that
> > needs
> > > > to coordinate, it is equally complex if the coordination is on the
> > broker
> > > > or among the applications themselves: for the latter case, you would
> > > > imagine some coordination services used (like ZK) to register groups
> > for
> > > > that topic and let these groups agree upon the minimum offset that is
> > > safe
> > > > to trim for all of them; for the former case, we just need to move
> this
> > > > coordination service into the broker side, which to me is not a good
> > > design
> > > > under the principle of making broker simple.
> > > >
> > > > And as we discussed, there are scenarios where the offset to trim is
> > not
> > > > necessarily dependent on the committed offsets, even if the topic is
> > only
> > > > consumed by a single consumer group and we do not need any
> > coordination.
> > > So
> > > > I think it is appropriate to require an "offset parameter" in the
> trim
> > > API.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin <be...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > I think the trim() interface is generally useful. What I was
> > wondering
> > > is
> > > > > the following:
> > > > > if the user has multiple applications to coordinate, it seems
> simpler
> > > for
> > > > > the broker to coordinate instead of asking the applications to
> > > coordinate
> > > > > among themselves. If we let the broker do the coordination and do
> not
> > > > want
> > > > > to reuse committed offset for trim(), we kind of need something
> like
> > > > > "offset for trim", which do not seems to be general enough to have.
> > But
> > > > if
> > > > > there is a single application then we don't need to worry about the
> > > > > coordination hence this is no longer a problem.
> > > > >
> > > > > The use cases for multiple consumer groups I am thinking of is some
> > > kind
> > > > of
> > > > > fork in the DAG, i.e. one intermediate result stream used by
> multiple
> > > > > downstream jobs. But that may not be a big deal if the processing
> is
> > > > within
> > > > > the same application.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Oct 25, 2016 at 11:41 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello Becket,
> > > > > >
> > > > > > I am not 100 percent sure I get your points, reading the first
> half
> > > of
> > > > > the
> > > > > > paragraph I thought we were on the same page that "the committed
> > > > offsets
> > > > > > and the offsets the applications ( most likely the consumers)
> would
> > > > like
> > > > > to
> > > > > > tell the brokers to trim to, could be totally different", but
> then
> > > you
> > > > > said
> > > > > > "not sure if the requirement ... is general enough", which
> confused
> > > me
> > > > a
> > > > > > bit :) Anyways, I think the consumer committed offsets should be
> > > > > separated
> > > > > > from whatever the proposed APIs for telling the brokers to safely
> > > trim
> > > > > > their logs since they will not be read any more. And Jun also
> made
> > a
> > > > good
> > > > > > point about that regarding the replay scenarios, which also
> applies
> > > for
> > > > > > users who do not require the flexibility as you mentioned.
> > > > > >
> > > > > > Regarding the coordination complexity among applications
> > themselves,
> > > my
> > > > > gut
> > > > > > feeling is that, in practice, this feature would be mostly used
> > when
> > > > the
> > > > > > topic is solely consumed by only one group, and for cases where
> the
> > > > topic
> > > > > > is gonna be consumed by multiple groups, this feature would less
> > > likely
> > > > > be
> > > > > > applicable. And if there are indeed such cases, coordination
> cannot
> > > be
> > > > > > avoidable since otherwise how can a consumer group (hence a dev
> > team
> > > /
> > > > > > project / etc) tell if the other group is OK with trimming the
> > data?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 25, 2016 at 6:58 PM, Becket Qin <
> becket.qin@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > The trim() interface would be useful in general. And I agree
> with
> > > > > > Guozhang
> > > > > > > that conceptually letting the application to decide when to
> > delete
> > > > the
> > > > > > > messages is more intuitive and flexible.
> > > > > > >
> > > > > > > That said, I am not sure if putting coordination on the
> > application
> > > > > side
> > > > > > is
> > > > > > > the best option. At a high level, there are two things to be
> > done:
> > > > > > > 1. Coordinate among all the interested consumer groups.
> > > > > > > 2. Telling the brokers to trim the log
> > > > > > >
> > > > > > > For (1), letting different applications coordinate among
> > themselves
> > > > is
> > > > > > more
> > > > > > > involved, and this logic may have to be implemented by
> different
> > > > > > > applications. As Guozhang mentioned, the most intuitive way may
> > be
> > > > > > looking
> > > > > > > at the committed offset for each of the groups. But the
> > > applications
> > > > > may
> > > > > > > still need to coordinate among themselves to avoid split brains
> > > > issues.
> > > > > > If
> > > > > > > there are many consumers from different applications, the
> brokers
> > > may
> > > > > > > potentially see a lot of offset queries. So, while letting the
> > > > consumer
> > > > > > > groups coordinate among themselves provides flexibility, it
> > doesn't
> > > > > look
> > > > > > > simpler overall. There seems a trade off between easiness of
> use
> > > and
> > > > > > > flexibility. For people who require flexibility, consumer side
> > > > > > coordination
> > > > > > > + trim() interface is the way to go. But for people who don't
> > > require
> > > > > > that,
> > > > > > > committed offset based retention seems simpler and does not
> need
> > > any
> > > > > > client
> > > > > > > side code change.
> > > > > > >
> > > > > > > For (2), in the current approach, the consumers tell the broker
> > > their
> > > > > > > positions by committing offsets. If we use trim(), it would be
> > more
> > > > > > > explicit. I am actually a little concerned about reusing the
> > > > committed
> > > > > > > offset for log retention. It essentially overloads the offset
> > > commit
> > > > > with
> > > > > > > both checkpointing and consume-based log retention, which may
> not
> > > > work
> > > > > > when
> > > > > > > people want to separate those two functions. People can use app
> > > side
> > > > > > > coordination + trim() to workaround this issue. But I am not
> sure
> > > if
> > > > > that
> > > > > > > the requirement of separating offset commit from consume-based
> > log
> > > > > > > retention is general enough to be addressed specifically.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Oct 25, 2016 at 3:00 PM, Joel Koshy <
> jjkoshy.w@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > +1 - I was thinking the exact same thing.
> > > > > > > >
> > > > > > > > On Tue, Oct 25, 2016 at 2:52 PM, Jun Rao <ju...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > > One of the main reasons for retaining messages on the
> broker
> > > > after
> > > > > > > > > consumption is to support replay. A common reason for
> replay
> > is
> > > > to
> > > > > > fix
> > > > > > > > and
> > > > > > > > > application error. So, it seems that it's a bit hard to
> > delete
> > > > log
> > > > > > > > segments
> > > > > > > > > just based on the committed offsets that the broker knows.
> An
> > > > > > > alternative
> > > > > > > > > approach is to support an api that can trim the log up to a
> > > > > specified
> > > > > > > > > offset (similar to what's being discussed in KIP-47). This
> > way,
> > > > an
> > > > > > > > > application can control when and how much to trim the log.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Mon, Oct 24, 2016 at 11:11 AM, Guozhang Wang <
> > > > > wangguoz@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Overall I think the motivation is common and of interests
> > to
> > > > lots
> > > > > > of
> > > > > > > > > users.
> > > > > > > > > > Would like to throw my two cents on this discussion:
> > > > > > > > > >
> > > > > > > > > > 1. Kafka topics can be used in different ways. For some
> > > > > categories
> > > > > > of
> > > > > > > > > > topics (think: "pageView" event topics), it is a shared
> > topic
> > > > > among
> > > > > > > > > > different teams / apps within the organization and lots
> of
> > > > > > temporary
> > > > > > > > > > consumers (for debugging, trouble shooting, prototype
> > > > > development,
> > > > > > > etc)
> > > > > > > > > can
> > > > > > > > > > come and go dynamically, in which case it is hard to
> track
> > > all
> > > > of
> > > > > > > such
> > > > > > > > > > consumer and maintain the minimum committed offsets; on
> the
> > > > other
> > > > > > > hand,
> > > > > > > > > > there are another category of topics (think: stream-app
> > owned
> > > > > > > > > intermediate
> > > > > > > > > > topics like "pricing-enriched-bid-activity", as Becket
> > > > mentioned
> > > > > > > > above)
> > > > > > > > > > which are particularly own but only one or a few apps,
> and
> > > > hence
> > > > > > the
> > > > > > > > > > consumer groups for those topics are pre-defined and
> > roughly
> > > > > > static.
> > > > > > > In
> > > > > > > > > > this case I think it makes sense to allow such
> > consumer-drive
> > > > log
> > > > > > > > > retention
> > > > > > > > > > features.
> > > > > > > > > >
> > > > > > > > > > 2. In this case, my question is then whether this
> > bookkeeping
> > > > of
> > > > > > > > > > min-committed-offsets should be done at the brokers side
> or
> > > it
> > > > > > should
> > > > > > > > be
> > > > > > > > > on
> > > > > > > > > > the app side. My gut feeling is that it could be better
> > > > bookkept
> > > > > on
> > > > > > > the
> > > > > > > > > app
> > > > > > > > > > (i.e. client) side which has the full information of the
> > > > > > "registered
> > > > > > > > > > consumer groups" for certain topics, and then knows the
> > > > > > > > > > min-committed-offsets. And a slightly-modified KIP-47
> > > mentioned
> > > > > by
> > > > > > > Dong
> > > > > > > > > > could a better fit, where a) app side bookkeep the
> > > > > consumer-driven
> > > > > > > min
> > > > > > > > > > offset based on their committed offsets, by either
> talking
> > to
> > > > the
> > > > > > > > > consumer
> > > > > > > > > > clients directly or query broker for the committed
> offsets
> > of
> > > > > those
> > > > > > > > > > registered consumer groups, and then b) write
> > > > > > > > > > *log.retention.min.offset* periodically
> > > > > > > > > > to broker to let it delete old segments before that
> offset
> > > > (NOTE
> > > > > > that
> > > > > > > > the
> > > > > > > > > > semantics is exactly the same as to KIP-47, while the
> only
> > > > > > difference
> > > > > > > > is
> > > > > > > > > > that we use offset instead of timestamp to indicate,
> which
> > > can
> > > > be
> > > > > > > honor
> > > > > > > > > by
> > > > > > > > > > the same implementation of KIP-47 on broker side).
> > > > > > > > > >
> > > > > > > > > > My arguments for letting the app side to bookkeep such
> > > > > min-offsets
> > > > > > > and
> > > > > > > > > only
> > > > > > > > > > let brokers to take requests to delete segments
> accordingly
> > > are
> > > > > 1)
> > > > > > > > > keeping
> > > > > > > > > > the broker simple without any querying each other about
> > such
> > > > > > offsets
> > > > > > > > and
> > > > > > > > > > does the min() calculation, rather only keeping /
> deleting
> > > > > messages
> > > > > > > > from
> > > > > > > > > > client admin requests, and 2) allowing more generalized
> > > > > > client-driven
> > > > > > > > log
> > > > > > > > > > retention policies with KIP-47 (i.e. broker is brainless
> > and
> > > > only
> > > > > > > take
> > > > > > > > > > requests while client-app can apply any customized logic
> to
> > > > > > determine
> > > > > > > > the
> > > > > > > > > > config values of *og.retention.min.offset or
> > > > > > > > > **og.retention.min.timestamp*
> > > > > > > > > > that
> > > > > > > > > > they send to the brokers).
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> > > > > becket.qin@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi David,
> > > > > > > > > > >
> > > > > > > > > > > > 1. What scenario is used to this configuration?
> > > > > > > > > > >
> > > > > > > > > > > One scenario is stream processing pipeline. In a stream
> > > > > > processing
> > > > > > > > DAG,
> > > > > > > > > > > there will be a bunch of intermediate result, we only
> > care
> > > > > about
> > > > > > > the
> > > > > > > > > > > consumer group that is in the downstream of the DAG,
> but
> > > not
> > > > > > other
> > > > > > > > > > groups.
> > > > > > > > > > > Ideally we want to delete the log of the intermediate
> > > topics
> > > > > > right
> > > > > > > > > after
> > > > > > > > > > > all the downstream processing jobs has successfully
> > > processed
> > > > > the
> > > > > > > > > > messages.
> > > > > > > > > > > In that case, we only care about the downstream
> > processing
> > > > > jobs,
> > > > > > > but
> > > > > > > > > not
> > > > > > > > > > > other groups. That means if a down stream job did not
> > > commit
> > > > > > offset
> > > > > > > > for
> > > > > > > > > > > some reason, we want to wait for that job. Without the
> > > > > predefined
> > > > > > > > > > > interested group, it is hard to achieve this.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 2. Yes, the configuration should be at topic level and
> > set
> > > > > > > > dynamically.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <
> 254479818@qq.com>
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Mayuresh,
> > > > > > > > > > > >     Thanks for the reply:
> > > > > > > > > > > > 1.  In the log retention check schedule, the broker
> > first
> > > > > find
> > > > > > > the
> > > > > > > > > all
> > > > > > > > > > > the
> > > > > > > > > > > > consumed group which are consuming this topic, and
> > query
> > > > the
> > > > > > > commit
> > > > > > > > > > > offset
> > > > > > > > > > > > of this consumed group for the topic
> > > > > > > > > > > > using the OffsetFetch API. And the min commit offset
> is
> > > the
> > > > > > > minimal
> > > > > > > > > > > commit
> > > > > > > > > > > > offset between these commit offsets.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > 2.  If the console consumer reading and commit, its
> > > commit
> > > > > > offset
> > > > > > > > > will
> > > > > > > > > > be
> > > > > > > > > > > > used to calculate the min commit offset for this
> topic.
> > > > > > > > > > > > We can avoid the random consumer using the method
> > Becket
> > > > > > > suggested.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > 3. It will not delete the log immediately, the log
> will
> > > > stay
> > > > > > some
> > > > > > > > > time
> > > > > > > > > > (
> > > > > > > > > > > > retention.commitoffset.ms), and after that we only
> > > delete
> > > > > > > > > > > > the log segments whose offsets are less than the min
> > > commit
> > > > > > > offset.
> > > > > > > > > So
> > > > > > > > > > > > the user can rewind its offset in the
> log.retention.ms
> > .
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > David
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > ------------------ 原始邮件 ------------------
> > > > > > > > > > > > 发件人: "Mayuresh Gharat";<gh...@gmail.com>;
> > > > > > > > > > > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > > > > > > > > > > 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > > >
> > > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention
> > > > before
> > > > > > log
> > > > > > > > > > > retention
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Hi David,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > >
> > > > > > > > > > > > I had some questions/suggestions :
> > > > > > > > > > > >
> > > > > > > > > > > > It would be great if you can explain with an example
> > > about
> > > > > how
> > > > > > > the
> > > > > > > > > min
> > > > > > > > > > > > offset for all the consumers will be calculated, in
> the
> > > > KIP.
> > > > > > > > > > > > What I meant was, it would be great to understand
> with
> > a
> > > > > pseudo
> > > > > > > > > > > > code/workflow if possible, how each broker knows all
> > the
> > > > > > > consumers
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > given topic-partition and how the min is calculated.
> > > > > > > > > > > >
> > > > > > > > > > > > Also it would be good to understand what happens if
> we
> > > > start
> > > > > a
> > > > > > > > > console
> > > > > > > > > > > > consumer which would actually start reading from the
> > > > > beginning
> > > > > > > > offset
> > > > > > > > > > and
> > > > > > > > > > > > commit and crash immediately. How will the segments
> get
> > > > > > deleted?
> > > > > > > > > > > >
> > > > > > > > > > > > Will it delete all the log segments if all the
> > consumers
> > > > have
> > > > > > > read
> > > > > > > > > till
> > > > > > > > > > > > latest? If Yes, would we be able to handle a scenario
> > > were
> > > > we
> > > > > > say
> > > > > > > > > that
> > > > > > > > > > > user
> > > > > > > > > > > > can rewind its offset to reprocess the data since
> > > > > > > log.retention.ms
> > > > > > > > > > might
> > > > > > > > > > > > not has reached.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Mayuresh
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <
> > > > > > > becket.qin@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey David,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for replies to the questions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think one major thing still not clear at this
> point
> > > is
> > > > > that
> > > > > > > > > whether
> > > > > > > > > > > the
> > > > > > > > > > > > > brokers will only apply the consumed log retention
> > to a
> > > > > > > specific
> > > > > > > > > set
> > > > > > > > > > of
> > > > > > > > > > > > > interested consumer groups, or it does not have
> such
> > a
> > > > set
> > > > > of
> > > > > > > > > > consumer
> > > > > > > > > > > > > groups.
> > > > > > > > > > > > >
> > > > > > > > > > > > > For example, for topic T, assume we know that there
> > > will
> > > > be
> > > > > > two
> > > > > > > > > > > > downstream
> > > > > > > > > > > > > consumer groups CG1 and CG2 consuming data from
> topic
> > > T.
> > > > > Will
> > > > > > > we
> > > > > > > > > add
> > > > > > > > > > a
> > > > > > > > > > > > > topic configurations such as
> > > > > > > > > > > > > "log.retention.commitoffset.
> > interested.groups=CG1,CG2"
> > > > to
> > > > > > > topic
> > > > > > > > T
> > > > > > > > > so
> > > > > > > > > > > > that
> > > > > > > > > > > > > the brokers only care about CG1 and CG2. The
> > committed
> > > > > > offsets
> > > > > > > of
> > > > > > > > > > other
> > > > > > > > > > > > > groups are not interested and won't have any impact
> > on
> > > > the
> > > > > > > > > committed
> > > > > > > > > > > > offset
> > > > > > > > > > > > > based log retention.
> > > > > > > > > > > > >
> > > > > > > > > > > > > It seems the current proposal does not have an
> > > > "interested
> > > > > > > > consumer
> > > > > > > > > > > group
> > > > > > > > > > > > > set" configuration, so that means any random
> consumer
> > > > group
> > > > > > may
> > > > > > > > > > affect
> > > > > > > > > > > > the
> > > > > > > > > > > > > committed offset based log retention.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think the committed offset based log retention
> > seems
> > > > more
> > > > > > > > useful
> > > > > > > > > in
> > > > > > > > > > > > cases
> > > > > > > > > > > > > where we already know which consumer groups will be
> > > > > consuming
> > > > > > > > from
> > > > > > > > > > this
> > > > > > > > > > > > > topic, so we will only wait for those consumer
> groups
> > > but
> > > > > > > ignore
> > > > > > > > > the
> > > > > > > > > > > > > others. If a group will be consumed by many unknown
> > or
> > > > > > > > > unpredictable
> > > > > > > > > > > > > consumer groups, it seems the existing time based
> log
> > > > > > retention
> > > > > > > > is
> > > > > > > > > > much
> > > > > > > > > > > > > simple and clear enough. So I would argue we don't
> > need
> > > > to
> > > > > > > > address
> > > > > > > > > > the
> > > > > > > > > > > > case
> > > > > > > > > > > > > that some groups may come later in the committed
> > offset
> > > > > based
> > > > > > > > > > > retention.
> > > > > > > > > > > > >
> > > > > > > > > > > > > That said, there may still be value to keep the
> data
> > > for
> > > > > some
> > > > > > > > time
> > > > > > > > > > even
> > > > > > > > > > > > > after all the interested consumer groups have
> > consumed
> > > > the
> > > > > > > > > messages.
> > > > > > > > > > > For
> > > > > > > > > > > > > example, in a pipelined stream processing DAG, we
> may
> > > > want
> > > > > to
> > > > > > > > keep
> > > > > > > > > > the
> > > > > > > > > > > > data
> > > > > > > > > > > > > of an intermediate topic for some time in case the
> > job
> > > > > fails.
> > > > > > > So
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > > > resume from a previously succeeded stage instead of
> > > > restart
> > > > > > the
> > > > > > > > > > entire
> > > > > > > > > > > > > pipeline. Or we can use the intermediate topic for
> > some
> > > > > > > debugging
> > > > > > > > > > work.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <
> > > 254479818@qq.com>
> > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > > >     The KIP is used to solve both these 2 cases,
> we
> > > > > > specify a
> > > > > > > > > small
> > > > > > > > > > > > > > consumed log retention time to deleted the
> consumed
> > > > data
> > > > > > and
> > > > > > > > > avoid
> > > > > > > > > > > > losing
> > > > > > > > > > > > > > un-consumed data.
> > > > > > > > > > > > > > And the specify a large force log retention time
> > used
> > > > as
> > > > > > > higher
> > > > > > > > > > bound
> > > > > > > > > > > > for
> > > > > > > > > > > > > > the data.  I will update the KIP for this info.
> > > > > > > > > > > > > >     Another solution I think may be ok is to
> > support
> > > an
> > > > > API
> > > > > > > to
> > > > > > > > > > delete
> > > > > > > > > > > > the
> > > > > > > > > > > > > > inactive group?  If the group is in inactive, but
> > > it's
> > > > > > commit
> > > > > > > > > > offset
> > > > > > > > > > > is
> > > > > > > > > > > > > > also in the __commit_offsets topic and
> > > > > > > > > > > > > > stay in the offset cache,  we can delete it via
> > this
> > > > API.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > David
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ------------------ 原始邮件 ------------------
> > > > > > > > > > > > > > 发件人: "Dong Lin";<li...@gmail.com>;
> > > > > > > > > > > > > > 发送时间: 2016年10月14日(星期五) 凌晨5:01
> > > > > > > > > > > > > > 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log
> > retention
> > > > > > before
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As explained in the motivation section of the
> KIP,
> > > the
> > > > > > > problem
> > > > > > > > is
> > > > > > > > > > > that
> > > > > > > > > > > > if
> > > > > > > > > > > > > > log retention is too small, we may lose data; and
> > if
> > > > log
> > > > > > > > > retention
> > > > > > > > > > is
> > > > > > > > > > > > too
> > > > > > > > > > > > > > large, then we waste disk space. Therefore, we
> need
> > > to
> > > > > > solve
> > > > > > > > one
> > > > > > > > > if
> > > > > > > > > > > the
> > > > > > > > > > > > > two
> > > > > > > > > > > > > > problems -- allow data to be persisted longer for
> > > > > > consumption
> > > > > > > > if
> > > > > > > > > > log
> > > > > > > > > > > > > > retention is set too small, or allow data to be
> > > expired
> > > > > > > earlier
> > > > > > > > > if
> > > > > > > > > > > log
> > > > > > > > > > > > > > retention is too large. I think the KIP probably
> > > needs
> > > > to
> > > > > > > make
> > > > > > > > > this
> > > > > > > > > > > > clear
> > > > > > > > > > > > > > and explain which one is rejected and why. Note
> > that
> > > > the
> > > > > > > choice
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > two
> > > > > > > > > > > > > > affects the solution -- if we want to address the
> > > first
> > > > > > > problem
> > > > > > > > > > then
> > > > > > > > > > > > > > log.retention.ms should be used as lower bound
> on
> > > the
> > > > > > actual
> > > > > > > > > > > retention
> > > > > > > > > > > > > > time, and if we want to address the second
> problem
> > > then
> > > > > the
> > > > > > > > > > > > > > log.retention.ms
> > > > > > > > > > > > > > should be used as higher bound on the actual
> > > retention
> > > > > > time.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In both cases, we probably need to figure out a
> way
> > > to
> > > > > > > > determine
> > > > > > > > > > > > "active
> > > > > > > > > > > > > > consumer group". Maybe we can compare the
> > > > > > > > time-since-last-commit
> > > > > > > > > > > > against
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > threshold to determine this. In addition, the
> > > threshold
> > > > > can
> > > > > > > be
> > > > > > > > > > > > overridden
> > > > > > > > > > > > > > either per-topic or per-groupId. If we go along
> > this
> > > > > route,
> > > > > > > the
> > > > > > > > > > > > rejected
> > > > > > > > > > > > > > solution (per-topic vs. per-groupId) should
> > probably
> > > be
> > > > > > > > explained
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > > KIP.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Dong
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <
> > > > > > > > lindong28@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi David,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for your explanation. There still seems
> to
> > > be
> > > > > > issue
> > > > > > > > with
> > > > > > > > > > > this
> > > > > > > > > > > > > > > solution. Please see my comment inline.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <
> > > > > 254479818@qq.com>
> > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hi Dong,
> > > > > > > > > > > > > > >>     Sorry for the delay, here are the
> comments:
> > > > > > > > > > > > > > >> 1.I think we should distinguish these two
> cases:
> > > > > > > > > > > > > > >> (1) group has no member, but has commit
> offset :
> > > In
> > > > > > this
> > > > > > > > case
> > > > > > > > > > we
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > >> consider its commit offset
> > > > > > > > > > > > > > >> (2) group has no member, no commit offset:
> Skip
> > > > this
> > > > > > > group
> > > > > > > > > > > > > > >> Is it ok?
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> ListGroup API can list the groups,  but this
> API
> > > > only
> > > > > > show
> > > > > > > > the
> > > > > > > > > > > > Online
> > > > > > > > > > > > > > >> Group, so we should enhance the listGroup API
> to
> > > > list
> > > > > > > those
> > > > > > > > > > groups
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> case (1)
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Say some user starts a consumer to consume
> > topic A
> > > > > with
> > > > > > > > > > > > > > > enable.auto.commit = true. Later they change
> the
> > > > group
> > > > > > name
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > config.
> > > > > > > > > > > > > > > Then the proposed solution will never execute
> > > > consumed
> > > > > > log
> > > > > > > > > > > retention
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the topic A, right? I think group name change
> is
> > > > pretty
> > > > > > > > common
> > > > > > > > > > and
> > > > > > > > > > > we
> > > > > > > > > > > > > > > should take care of this issue. One possible
> > > solution
> > > > > is
> > > > > > to
> > > > > > > > > add a
> > > > > > > > > > > > > config
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > specify the maximum time since last offset
> commit
> > > > > before
> > > > > > we
> > > > > > > > > > > consider
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > group is inactive.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> 2. Because every consumer group may appear in
> > > > > different
> > > > > > > > time,
> > > > > > > > > > say,
> > > > > > > > > > > > > group
> > > > > > > > > > > > > > >> 1 start to consume in day 1, group 2 start to
> > > > consume
> > > > > in
> > > > > > > day
> > > > > > > > > 2.
> > > > > > > > > > > If
> > > > > > > > > > > > we
> > > > > > > > > > > > > > >> delete the log segment right away,
> > > > > > > > > > > > > > >> group 2 can not consume these message.  So we
> > hope
> > > > the
> > > > > > > > > messages
> > > > > > > > > > > can
> > > > > > > > > > > > > hold
> > > > > > > > > > > > > > >> for a specified time.  I think many use-cases
> > will
> > > > > need
> > > > > > > > there
> > > > > > > > > > > > configs,
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > >> there are many consumer groups.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > If we want to take care of group 2, can we
> simply
> > > > > disable
> > > > > > > > > > consumed
> > > > > > > > > > > > log
> > > > > > > > > > > > > > > retention for the topic and set log retention
> to
> > 1
> > > > day?
> > > > > > Can
> > > > > > > > you
> > > > > > > > > > > > explain
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > benefit of enabling consumed log retention and
> > set
> > > > > > consumed
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > > > to 1 day?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Currently the flow graph in the KIP suggests
> that
> > > we
> > > > > > delete
> > > > > > > > > data
> > > > > > > > > > > iff
> > > > > > > > > > > > > > > (consumed log retention is triggered OR forced
> > log
> > > > > > > retention
> > > > > > > > is
> > > > > > > > > > > > > > triggered).
> > > > > > > > > > > > > > > And alternative solution is to delete data iff
> (
> > > > > > (consumed
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > disabled OR consumed log retention is
> triggered)
> > > AND
> > > > > > forced
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > > > is triggered). I would argue that the 2nd
> scheme
> > is
> > > > > > better.
> > > > > > > > Say
> > > > > > > > > > the
> > > > > > > > > > > > > > > consumed log retention is enabled. The 1st
> scheme
> > > > > > basically
> > > > > > > > > > > > interprets
> > > > > > > > > > > > > > > forced log retention as the upper bound of the
> > time
> > > > the
> > > > > > > data
> > > > > > > > > can
> > > > > > > > > > > stay
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > Kafka. The 2nd scheme interprets forced log
> > > retention
> > > > > as
> > > > > > > the
> > > > > > > > > > lower
> > > > > > > > > > > > > bound
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the time the data can stay in Kafka, which is
> > more
> > > > > > > consistent
> > > > > > > > > > with
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > purpose of having this forced log retention (to
> > > save
> > > > > disk
> > > > > > > > > space).
> > > > > > > > > > > And
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > adopt the 2nd solution, the use-case you
> > suggested
> > > > can
> > > > > be
> > > > > > > > > easily
> > > > > > > > > > > > > > addressed
> > > > > > > > > > > > > > > by setting forced log retention to 1 day and
> > enable
> > > > > > > consumed
> > > > > > > > > log
> > > > > > > > > > > > > > retention.
> > > > > > > > > > > > > > > What do you think?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >> David
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> ------------------ 原始邮件 ------------------
> > > > > > > > > > > > > > >> 发件人: "Dong Lin";<li...@gmail.com>;
> > > > > > > > > > > > > > >> 发送时间: 2016年10月10日(星期一) 下午4:05
> > > > > > > > > > > > > > >> 收件人: "dev"<de...@kafka.apache.org>;
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> 主题: Re: [DISCUSS] KIP-68 Add a consumed log
> > > > retention
> > > > > > > before
> > > > > > > > > log
> > > > > > > > > > > > > > retention
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Hey David,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks for reply. Please see comment inline.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L)
> <
> > > > > > > > > > > > pengwei.li@huawei.com>
> > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hi Dong
> > > > > > > > > > > > > > >> >    Thanks for the questions:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > 1.  Now we don't distinguish inactive or
> > active
> > > > > > groups.
> > > > > > > > > > Because
> > > > > > > > > > > in
> > > > > > > > > > > > > > some
> > > > > > > > > > > > > > >> > case maybe inactive group will become active
> > > > again,
> > > > > > and
> > > > > > > > > using
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> previous
> > > > > > > > > > > > > > >> > commit offset.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > So we will not delete the log segment in the
> > > > > consumer
> > > > > > > > > > retention
> > > > > > > > > > > if
> > > > > > > > > > > > > > there
> > > > > > > > > > > > > > >> > are some groups consume but not commit, but
> > the
> > > > log
> > > > > > > > segment
> > > > > > > > > > can
> > > > > > > > > > > be
> > > > > > > > > > > > > > >> delete by
> > > > > > > > > > > > > > >> >      the force retention.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> So in the example I provided, the consumed log
> > > > > retention
> > > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > > > > >> effectively disabled, right? This seems to be
> a
> > > real
> > > > > > > problem
> > > > > > > > > in
> > > > > > > > > > > > > > operation
> > > > > > > > > > > > > > >> -- we don't want log retention to be
> > > > un-intentionally
> > > > > > > > disabled
> > > > > > > > > > > > simply
> > > > > > > > > > > > > > >> because someone start a tool to consume from
> > that
> > > > > topic.
> > > > > > > > > Either
> > > > > > > > > > > this
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > >> should provide a way to handle this, or there
> > > should
> > > > > be
> > > > > > a
> > > > > > > > way
> > > > > > > > > > for
> > > > > > > > > > > > > > operator
> > > > > > > > > > > > > > >> to be aware of such case and be able to
> > re-eanble
> > > > > > consumed
> > > > > > > > log
> > > > > > > > > > > > > retention
> > > > > > > > > > > > > > >> for the topic. What do you think?
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > 2.  These configs are used to determine the
> > out
> > > of
> > > > > > date
> > > > > > > > time
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > consumed retention, like the parameters of
> the
> > > > force
> > > > > > > > > retention
> > > > > > > > > > > > > > >> > (log.retention.hours, log.retention.minutes,
> > > > > > > > > log.retention.ms
> > > > > > > > > > ).
> > > > > > > > > > > > For
> > > > > > > > > > > > > > >> > example, users want the save the log for 3
> > days,
> > > > > > after 3
> > > > > > > > > days,
> > > > > > > > > > > > kafka
> > > > > > > > > > > > > > >> will
> > > > > > > > > > > > > > >> > delete the log segments which are
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > consumed by all the consumer group.  The log
> > > > > retention
> > > > > > > > > thread
> > > > > > > > > > > need
> > > > > > > > > > > > > > these
> > > > > > > > > > > > > > >> > parameters.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > It makes sense to have configs such as
> > > > > > log.retention.ms
> > > > > > > > --
> > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > used
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> make data available for up to a configured
> > amount
> > > of
> > > > > > time
> > > > > > > > > before
> > > > > > > > > > > it
> > > > > > > > > > > > is
> > > > > > > > > > > > > > >> deleted. My question is what is the use-case
> for
> > > > > making
> > > > > > > log
> > > > > > > > > > > > available
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> another e.g. 3 days after it has been consumed
> > by
> > > > all
> > > > > > > > consumer
> > > > > > > > > > > > groups.
> > > > > > > > > > > > > > The
> > > > > > > > > > > > > > >> purpose of this KIP is to allow log to be
> > deleted
> > > > > right
> > > > > > as
> > > > > > > > > long
> > > > > > > > > > as
> > > > > > > > > > > > all
> > > > > > > > > > > > > > >> interested consumer groups have consumed it.
> Can
> > > you
> > > > > > > > provide a
> > > > > > > > > > > > > use-case
> > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > >> keeping log available for longer time after it
> > has
> > > > > been
> > > > > > > > > consumed
> > > > > > > > > > > by
> > > > > > > > > > > > > all
> > > > > > > > > > > > > > >> groups?
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > >> > David
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > Hey David,
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Thanks for the KIP. Can you help with the
> > > > > following
> > > > > > > two
> > > > > > > > > > > > questions:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 1) If someone start a consumer (e.g.
> > > > > > > > > kafka-console-consumer)
> > > > > > > > > > > to
> > > > > > > > > > > > > > >> consume a
> > > > > > > > > > > > > > >> > > topic for debug/validation purpose, a
> > randome
> > > > > > consumer
> > > > > > > > > group
> > > > > > > > > > > may
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > >> > created
> > > > > > > > > > > > > > >> > > and offset may be committed for this
> > consumer
> > > > > group.
> > > > > > > If
> > > > > > > > no
> > > > > > > > > > > > offset
> > > > > > > > > > > > > > >> commit
> > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > >> > > made for this consumer group in the
> future,
> > > will
> > > > > > this
> > > > > > > > > > > > effectively
> > > > > > > > > > > > > > >> > > disable consumed log retention for this
> > topic?
> > > > In
> > > > > > > other
> > > > > > > > > > words,
> > > > > > > > > > > > how
> > > > > > > > > > > > > > do
> > > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > > >> > > KIP distinguish active consumer group from
> > > > > inactive
> > > > > > > > ones?
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > 2) Why do we need new configs such as
> > > > > > > > > > > > > log.retention.commitoffset.hou
> > > > > > > > > > > > > > >> rs?
> > > > > > > > > > > > > > >> > Can
> > > > > > > > > > > > > > >> > >we simply delete log segments if consumed
> log
> > > > > > retention
> > > > > > > > is
> > > > > > > > > > > > enabled
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > > >> > > topic and all consumer groups have
> consumed
> > > > > messages
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > log
> > > > > > > > > > > > > > >> segment?
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > > > >> > > Dong
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei
> (L) <
> > > > > > > > > > > > pengwei.li@huawei.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > Hi Becket,
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >   Thanks for the feedback:
> > > > > > > > > > > > > > >> > > > 1.  We use the simple consumer api to
> > query
> > > > the
> > > > > > > commit
> > > > > > > > > > > offset,
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > >> > don't
> > > > > > > > > > > > > > >> > > > need to specify the consumer group.
> > > > > > > > > > > > > > >> > > > 2.  Every broker using the simple
> consumer
> > > > > > > > > > > api(OffsetFetchKey)
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> query
> > > > > > > > > > > > > > >> > > > the commit offset in the log retention
> > > > process.
> > > > > > The
> > > > > > > > > > client
> > > > > > > > > > > > can
> > > > > > > > > > > > > > >> commit
> > > > > > > > > > > > > > >> > > > offset or not.
> > > > > > > > > > > > > > >> > > > 3.  It does not need to distinguish the
> > > > follower
> > > > > > > > brokers
> > > > > > > > > > or
> > > > > > > > > > > > > leader
> > > > > > > > > > > > > > >> > > > brokers,  every brokers can query.
> > > > > > > > > > > > > > >> > > > 4.  We don't need to change the
> protocols,
> > > we
> > > > > > mainly
> > > > > > > > > > change
> > > > > > > > > > > > the
> > > > > > > > > > > > > > log
> > > > > > > > > > > > > > >> > > > retention process in the log manager.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >   One question is the query min offset
> > need
> > > > > > > > > O(partitions *
> > > > > > > > > > > > > groups)
> > > > > > > > > > > > > > >> time
> > > > > > > > > > > > > > >> > > > complexity, another alternative is to
> > build
> > > an
> > > > > > > > internal
> > > > > > > > > > > topic
> > > > > > > > > > > > to
> > > > > > > > > > > > > > >> save
> > > > > > > > > > > > > > >> > every
> > > > > > > > > > > > > > >> > > > partition's min offset, it can reduce to
> > > O(1).
> > > > > > > > > > > > > > >> > > > I will update the wiki for more details.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > >> > > > David
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > Hi Pengwei,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thanks for the KIP proposal. It is a
> > very
> > > > > useful
> > > > > > > > KIP.
> > > > > > > > > > At a
> > > > > > > > > > > > > high
> > > > > > > > > > > > > > >> > level,
> > > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > > >> > > > > proposed behavior looks reasonable to
> > me.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > However, it seems that some of the
> > details
> > > > are
> > > > > > not
> > > > > > > > > > > mentioned
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > KIP.
> > > > > > > > > > > > > > >> > > > > For example,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > 1. How will the expected consumer
> group
> > be
> > > > > > > > specified?
> > > > > > > > > Is
> > > > > > > > > > > it
> > > > > > > > > > > > > > >> through
> > > > > > > > > > > > > > >> > a per
> > > > > > > > > > > > > > >> > > > > topic dynamic configuration?
> > > > > > > > > > > > > > >> > > > > 2. How do the brokers detect the
> > consumer
> > > > > > offsets?
> > > > > > > > Is
> > > > > > > > > it
> > > > > > > > > > > > > > required
> > > > > > > > > > > > > > >> > for a
> > > > > > > > > > > > > > >> > > > > consumer to commit offsets?
> > > > > > > > > > > > > > >> > > > > 3. How do all the replicas know the
> > about
> > > > the
> > > > > > > > > committed
> > > > > > > > > > > > > offsets?
> > > > > > > > > > > > > > >> > e.g. 1)
> > > > > > > > > > > > > > >> > > > > non-coordinator brokers which do not
> > have
> > > > the
> > > > > > > > > committed
> > > > > > > > > > > > > offsets,
> > > > > > > > > > > > > > >> 2)
> > > > > > > > > > > > > > >> > > > > follower brokers which do not have
> > > consumers
> > > > > > > > directly
> > > > > > > > > > > > > consuming
> > > > > > > > > > > > > > >> from
> > > > > > > > > > > > > > >> > it.
> > > > > > > > > > > > > > >> > > > > 4. Is there any other changes need to
> be
> > > > made
> > > > > > > (e.g.
> > > > > > > > > new
> > > > > > > > > > > > > > >> protocols) in
> > > > > > > > > > > > > > >> > > > > addition to the configuration change?
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > It would be great if you can update
> the
> > > wiki
> > > > > to
> > > > > > > have
> > > > > > > > > > more
> > > > > > > > > > > > > > details.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > On Wed, Sep 7, 2016 at 2:26 AM,
> Pengwei
> > > (L)
> > > > <
> > > > > > > > > > > > > > >> pengwei.li@huawei.com>
> > > > > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > Hi All,
> > > > > > > > > > > > > > >> > > > > >    I have made a KIP to enhance the
> > log
> > > > > > > retention,
> > > > > > > > > > > details
> > > > > > > > > > > > > as
> > > > > > > > > > > > > > >> > follows:
> > > > > > > > > > > > > > >> > > > > > https://cwiki.apache.org/
> > > > > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > > > > > >> > > > > > 68+Add+a+consumed+log+
> > > > > > > > > retention+before+log+retention
> > > > > > > > > > > > > > >> > > > > >    Now start a discuss thread for
> this
> > > > KIP ,
> > > > > > > > looking
> > > > > > > > > > > > forward
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > >> > > > > > feedback.
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > > David
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > -Regards,
> > > > > > > > > > > > Mayuresh R. Gharat
> > > > > > > > > > > > (862) 250-7125
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>