You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dong Lin <li...@gmail.com> on 2017/01/03 22:45:25 UTC

[DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Hi all,

We created KIP-107 to propose addition of purgeDataBefore() API in
AdminClient.

Please find the KIP wiki in the link https://iwww.corp.linkedin.
com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+design+proposal. We
would love to hear your comments and suggestions.

Thanks,
Dong

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hi Mayuresh,

low_watermark will be updated when log retention fires on the broker. It
may also be updated on the follower when follower receives FetchResponse
from leader; and it may be updated on the leader when leader receives
PurgeRequest from admin client.

Thanks,
Dong

On Wed, Jan 11, 2017 at 7:37 AM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> Hi Dong,
>
> As per  "If the message's offset is below low_watermark,
> then it should have been deleted by log retention policy."
> ---> I am not sure if  I understand this correctly. Do you mean to say that
> the low_watermark will be updated only when the log retention fires on the
> broker?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Jan 10, 2017 at 2:56 PM, Dong Lin <li...@gmail.com> wrote:
>
> > Bump up. I am going to initiate the vote If there is no further concern
> > with the KIP.
> >
> > On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <li...@gmail.com> wrote:
> >
> > > Hey Mayuresh,
> > >
> > > Thanks for the comment. If the message's offset is below low_watermark,
> > > then it should have been deleted by log retention policy. Thus it is OK
> > not
> > > to expose this message to consumer. Does this answer your question?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <
> > > gharatmayuresh15@gmail.com> wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> I had a question (which might have been answered before).
> > >>
> > >> 1) The KIP says that the low_water_mark will be updated periodically
> by
> > >> the
> > >> broker like high_water_mark.
> > >> Essentially we want to use low_water_mark for cases where an entire
> > >> segment
> > >> cannot be deleted because may be the segment_start_offset <
> PurgeOffset
> > <
> > >> segment_end_offset, in which case we will set the low_water_mark to
> > >> PurgeOffset+1.
> > >>
> > >> 2) The KIP also says that messages below low_water_mark will not be
> > >> exposed
> > >> for consumers, which does make sense since we want say that data below
> > >> low_water_mark is purged.
> > >>
> > >> Looking at above conditions, does it make sense not to update the
> > >> low_water_mark periodically but only on PurgeRequest?
> > >> The reason being, if we update it periodically then as per 2) we will
> > not
> > >> be allowing consumers to re-consume data that is not purged but is
> below
> > >> low_water_mark.
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >>
> > >> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <li...@gmail.com>
> wrote:
> > >>
> > >> > Hey Jun,
> > >> >
> > >> > Thanks for reviewing the KIP!
> > >> >
> > >> > 1. The low_watermark will be checkpointed in a new file named
> > >> >  "replication-low-watermark-checkpoint". It will have the same
> format
> > >> as
> > >> > the existing replication-offset-checkpoint file. This allows us the
> > keep
> > >> > the existing format of checkpoint files which maps TopicPartition to
> > >> Long.
> > >> > I just updated the "Public Interface" section in the KIP wiki to
> > explain
> > >> > this file.
> > >> >
> > >> > 2. I think using low_watermark from leader to trigger log retention
> in
> > >> the
> > >> > follower will work correctly in the sense that all messages with
> > offset
> > >> <
> > >> > low_watermark can be deleted. But I am not sure that the efficiency
> is
> > >> the
> > >> > same, i.e. offset of messages which should be deleted (i.e. due to
> > time
> > >> or
> > >> > size-based log retention policy) will be smaller than low_watermark
> > from
> > >> > the leader.
> > >> >
> > >> > For example, say both the follower and the leader have messages with
> > >> > offsets in range [0, 2000]. If the follower does log rolling
> slightly
> > >> later
> > >> > than leader, the segments on follower would be [0, 1001], [1002,
> 2000]
> > >> and
> > >> > segments on leader would be [0, 1000], [1001, 2000]. After leader
> > >> deletes
> > >> > the first segment, the low_watermark would be 1001. Thus the first
> > >> segment
> > >> > would stay on follower's disk unnecessarily which may double disk
> > usage
> > >> at
> > >> > worst.
> > >> >
> > >> > Since this approach doesn't save us much, I am inclined to not
> include
> > >> this
> > >> > change to keep the KIP simple.
> > >> >
> > >> > Dong
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:
> > >> >
> > >> > > Hi, Dong,
> > >> > >
> > >> > > Thanks for the proposal. Looks good overall. A couple of comments.
> > >> > >
> > >> > > 1. Where is the low_watermark checkpointed? Is that
> > >> > > in replication-offset-checkpoint? If so, do we need to bump up the
> > >> > version?
> > >> > > Could you also describe the format change?
> > >> > >
> > >> > > 2. For topics with "delete" retention, currently we let each
> replica
> > >> > delete
> > >> > > old segments independently. With low_watermark, we could just let
> > >> leaders
> > >> > > delete old segments through the deletion policy and the followers
> > will
> > >> > > simply delete old segments based on low_watermark. Not sure if
> this
> > >> saves
> > >> > > much, but is a potential option that may be worth thinking about.
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenblatt@gmail.com
> >
> > >> > wrote:
> > >> > >
> > >> > > > one more example of complicated config - mirror maker.
> > >> > > >
> > >> > > > we definitely cant trust each and every topic owner to configure
> > >> their
> > >> > > > topics not to purge before they've been mirrored.
> > >> > > > which would mean there's a per-topic config (set by the owner)
> > and a
> > >> > > > "global" config (where mirror makers are specified) and they
> need
> > >> to be
> > >> > > > "merged".
> > >> > > > for those topics that _are_ mirrored.
> > >> > > > which is a changing set of topics thats stored in an external
> > system
> > >> > > > outside of kafka.
> > >> > > > if a topic is taken out of the mirror set the MM offset would be
> > >> > "frozen"
> > >> > > > at that point and prevent clean-up for all eternity, unless its
> > >> > > cleaned-up
> > >> > > > itself.
> > >> > > >
> > >> > > > ...
> > >> > > >
> > >> > > > complexity :-)
> > >> > > >
> > >> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <
> radai.rosenblatt@gmail.com
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > > in summary - i'm not opposed to the idea of a per-topic clean
> up
> > >> > config
> > >> > > > > that tracks some set of consumer groups' offsets (which would
> > >> > probably
> > >> > > > work
> > >> > > > > for 80% of use cases), but i definitely see a need to expose a
> > >> simple
> > >> > > API
> > >> > > > > for the more advanced/obscure/custom use cases (the other
> 20%).
> > >> > > > >
> > >> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <
> > radai.rosenblatt@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >
> > >> > > > >> a major motivation for this KIP is cost savings.
> > >> > > > >>
> > >> > > > >> lots of internal systems at LI use kafka as an intermediate
> > pipe,
> > >> > and
> > >> > > > set
> > >> > > > >> the topic retention period to a "safe enough" amount of time
> to
> > >> be
> > >> > > able
> > >> > > > to
> > >> > > > >> recover from crashes/downtime and catch up to "now". this
> > results
> > >> > in a
> > >> > > > few
> > >> > > > >> days' worth of retention typically.
> > >> > > > >>
> > >> > > > >> however, under normal operating conditions the consumers are
> > >> mostly
> > >> > > > >> caught-up and so early clean-up enables a big cost savings in
> > >> > storage.
> > >> > > > >>
> > >> > > > >> as for my points:
> > >> > > > >>
> > >> > > > >> 1. when discussing implementation options for automatic
> > clean-up
> > >> we
> > >> > > > >> realized that cleaning up by keeping track of offsets stored
> in
> > >> > kafka
> > >> > > > >> requires some per-topic config - you need to specify which
> > >> groups to
> > >> > > > track.
> > >> > > > >> this becomes a problem because:
> > >> > > > >>     1.1 - relatively complicated code, to be written in the
> > >> broker.
> > >> > > > >>     1.2 - configuration needs to be maintained up to date by
> > >> topic
> > >> > > > >> "owners" - of which we have thousands. failure to do so would
> > >> > decrease
> > >> > > > the
> > >> > > > >> cost benefit.
> > >> > > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> > >> > > "bootstrap"
> > >> > > > >> workflow where they will reset their offsets to an earlier
> > value
> > >> > than
> > >> > > > the
> > >> > > > >> one stored. this means that a stored offset of X does not
> > always
> > >> > mean
> > >> > > > you
> > >> > > > >> can clean up to X-1. think of it as video encoding -some apps
> > >> have
> > >> > > "key
> > >> > > > >> frames" they may seek back to which are before their current
> > >> offset.
> > >> > > > >>     1.4 - there are multiple possible strategies - you could
> > >> clean
> > >> > up
> > >> > > > >> aggressively, retain some "time distance" from latest, some
> > >> "offset
> > >> > > > >> distance", etc. this we think would have made it very hard to
> > >> agree
> > >> > > on a
> > >> > > > >> single "correct" implementation that everyone would be happy
> > >> with.
> > >> > it
> > >> > > > would
> > >> > > > >> be better to include the raw functionality in the API and
> leave
> > >> the
> > >> > > > >> "brains" to an external monitoring system where people could
> > >> > > > custom-taylor
> > >> > > > >> their logic
> > >> > > > >>
> > >> > > > >> 2. ad-hoc consumer groups: its common practice for devs to
> spin
> > >> up
> > >> > > > >> console consumers and connect to a topic as a debug aid. SREs
> > may
> > >> > also
> > >> > > > do
> > >> > > > >> this. there are also various other eco-system applications
> that
> > >> may
> > >> > > > >> consumer from topics (unknown to topic owners as those are
> > infra
> > >> > > > monitoring
> > >> > > > >> tools). obviously such consumer-groups' offsets should be
> > ignored
> > >> > for
> > >> > > > >> purposes of clean-up, but coming up with a bullet-proof way
> to
> > do
> > >> > this
> > >> > > > is
> > >> > > > >> non-trivial and again ties with implementation complexity and
> > >> > > > inflexibility
> > >> > > > >> of a "one size fits all" solution in 1.4 above.
> > >> > > > >>
> > >> > > > >> 3. forceful clean-up: we have workflows that use kafka to
> move
> > >> > > gigantic
> > >> > > > >> blobs from offline hadoop processing flows into systems. the
> > data
> > >> > > being
> > >> > > > >> "loaded" into such an online system can be several GBs in
> side
> > >> and
> > >> > > take
> > >> > > > a
> > >> > > > >> long time to consume (they are sliced into many small msgs).
> > >> > sometimes
> > >> > > > the
> > >> > > > >> sender wants to abort and start a new blob before the current
> > >> load
> > >> > > > process
> > >> > > > >> has completed - meaning the consumer's offsets are not yet
> > caught
> > >> > up.
> > >> > > > >>
> > >> > > > >> 4. offsets outside of kafka: yes, you could force
> applications
> > to
> > >> > > store
> > >> > > > >> their offsets twice, but thats inefficient. its better to
> > expose
> > >> a
> > >> > > raw,
> > >> > > > >> simple API and let such applications manage their own
> clean-up
> > >> logic
> > >> > > > (this
> > >> > > > >> again ties into 1.4 and no "one size fits all" solution)
> > >> > > > >>
> > >> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <
> lindong28@gmail.com
> > >
> > >> > > wrote:
> > >> > > > >>
> > >> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > >> > > > >>> ewen@confluent.io>
> > >> > > > >>> wrote:
> > >> > > > >>>
> > >> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <
> > lindong28@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >>> >
> > >> > > > >>> > > Hey Ewen,
> > >> > > > >>> > >
> > >> > > > >>> > > Thanks for the review. As Radai explained, it would be
> > >> complex
> > >> > in
> > >> > > > >>> terms
> > >> > > > >>> > of
> > >> > > > >>> > > user configuration if we were to use committed offset to
> > >> decide
> > >> > > > data
> > >> > > > >>> > > deletion. We need a way to specify which groups need to
> > >> consume
> > >> > > > data
> > >> > > > >>> of
> > >> > > > >>> > > this partition. The broker will also need to consume the
> > >> entire
> > >> > > > >>> offsets
> > >> > > > >>> > > topic in that approach which has some overhead. I don't
> > >> think
> > >> > it
> > >> > > is
> > >> > > > >>> that
> > >> > > > >>> > > hard to implement. But it will likely take more time to
> > >> discuss
> > >> > > > that
> > >> > > > >>> > > approach due to the new config and the server side
> > overhead.
> > >> > > > >>> > >
> > >> > > > >>> > > We choose to put this API in AdminClient because the API
> > is
> > >> > more
> > >> > > > >>> like an
> > >> > > > >>> > > administrative operation (such as listGroups,
> > deleteTopics)
> > >> > than
> > >> > > a
> > >> > > > >>> > consumer
> > >> > > > >>> > > operation. It is not necessarily called by consumer
> only.
> > >> For
> > >> > > > >>> example, we
> > >> > > > >>> > > can implement the "delete data before committed offset"
> > >> > approach
> > >> > > by
> > >> > > > >>> > running
> > >> > > > >>> > > an external service which calls purgeDataBefore() API
> > based
> > >> on
> > >> > > > >>> committed
> > >> > > > >>> > > offset of consumer groups.
> > >> > > > >>> > >
> > >> > > > >>> > > I am not aware that AdminClient is not a public API.
> > >> Suppose it
> > >> > > is
> > >> > > > >>> not
> > >> > > > >>> > > public now, I assume we plan to make it public in the
> > >> future as
> > >> > > > part
> > >> > > > >>> of
> > >> > > > >>> > > KIP-4. Are we not making it public because its interface
> > is
> > >> not
> > >> > > > >>> stable?
> > >> > > > >>> > If
> > >> > > > >>> > > so, can we just tag this new API as not stable in the
> > code?
> > >> > > > >>> > >
> > >> > > > >>> >
> > >> > > > >>> >
> > >> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> > >> > > implementation.
> > >> > > > >>> It's
> > >> > > > >>> > definitely confusing that both will be (could be?) named
> > >> > > AdminClient,
> > >> > > > >>> but
> > >> > > > >>> > we've kept the existing Scala AdminClient out of the
> public
> > >> API
> > >> > and
> > >> > > > >>> have
> > >> > > > >>> > not required KIPs for changes to it.
> > >> > > > >>> >
> > >> > > > >>> > That said, I agree that if this type of API makes it into
> > >> Kafka,
> > >> > > > >>> having a
> > >> > > > >>> > (new, Java-based) AdminClient method would definitely be a
> > >> good
> > >> > > idea.
> > >> > > > >>> An
> > >> > > > >>> > alternative path might be to have a Consumer-based
> > >> implementation
> > >> > > > since
> > >> > > > >>> > that seems like a very intuitive, natural way to use the
> > >> > protocol.
> > >> > > I
> > >> > > > >>> think
> > >> > > > >>> > optimizing for the expected use case would be a good idea.
> > >> > > > >>> >
> > >> > > > >>> > -Ewen
> > >> > > > >>> >
> > >> > > > >>> > Are you saying that the Scala AdminClient is not a public
> > API
> > >> and
> > >> > > we
> > >> > > > >>> discourage addition of any new feature to this class?
> > >> > > > >>>
> > >> > > > >>> I still prefer to add it to AdminClient (Java version in the
> > >> future
> > >> > > and
> > >> > > > >>> Scala version in the short team) because I feel it belongs
> to
> > >> admin
> > >> > > > >>> operation instead of KafkaConsumer interface. For example,
> if
> > in
> > >> > the
> > >> > > > >>> future
> > >> > > > >>> we implement the "delete data before committed offset"
> > strategy
> > >> in
> > >> > an
> > >> > > > >>> external service, I feel it is a bit awkward if the service
> > has
> > >> to
> > >> > > > >>> instantiate a KafkaConsumer and call
> > >> KafkaConsumer.purgeDataBefore(
> > >> > > > ...)
> > >> > > > >>> to
> > >> > > > >>> purge data. In other words, our expected use-case doesn't
> > >> > necessarily
> > >> > > > >>> bind
> > >> > > > >>> this API with consumer.
> > >> > > > >>>
> > >> > > > >>> I am not strong on this issue. Let's see what other
> > >> > > > committers/developers
> > >> > > > >>> think about this.
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>> >
> > >> > > > >>> > >
> > >> > > > >>> > > Thanks,
> > >> > > > >>> > > Dong
> > >> > > > >>> > >
> > >> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> > >> > > > >>> ewen@confluent.io
> > >> > > > >>> > >
> > >> > > > >>> > > wrote:
> > >> > > > >>> > >
> > >> > > > >>> > > > Dong,
> > >> > > > >>> > > >
> > >> > > > >>> > > > Looks like that's an internal link,
> > >> > > > >>> > > > https://cwiki.apache.org/confl
> > >> uence/display/KAFKA/KIP-107%
> > >> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > >> > > > >>> > > > is the right one.
> > >> > > > >>> > > >
> > >> > > > >>> > > > I have a question about one of the rejected
> > alternatives:
> > >> > > > >>> > > >
> > >> > > > >>> > > > > Using committed offset instead of an extra API to
> > >> trigger
> > >> > > data
> > >> > > > >>> purge
> > >> > > > >>> > > > operation.
> > >> > > > >>> > > >
> > >> > > > >>> > > > The KIP says this would be more complicated to
> > implement.
> > >> Why
> > >> > > is
> > >> > > > >>> that?
> > >> > > > >>> > I
> > >> > > > >>> > > > think brokers would have to consume the entire offsets
> > >> topic,
> > >> > > but
> > >> > > > >>> the
> > >> > > > >>> > > data
> > >> > > > >>> > > > stored in memory doesn't seem to change and applying
> > this
> > >> > when
> > >> > > > >>> updated
> > >> > > > >>> > > > offsets are seen seems basically the same. It might
> also
> > >> be
> > >> > > > >>> possible to
> > >> > > > >>> > > > make it work even with multiple consumer groups if
> that
> > >> was
> > >> > > > desired
> > >> > > > >>> > > > (although that'd require tracking more data in memory)
> > as
> > >> a
> > >> > > > >>> > > generalization
> > >> > > > >>> > > > without requiring coordination between the consumer
> > >> groups.
> > >> > > Given
> > >> > > > >>> the
> > >> > > > >>> > > > motivation, I'm assuming this was considered
> unnecessary
> > >> > since
> > >> > > > this
> > >> > > > >>> > > > specifically targets intermediate stream processing
> > >> topics.
> > >> > > > >>> > > >
> > >> > > > >>> > > > Another question is why expose this via AdminClient
> > (which
> > >> > > isn't
> > >> > > > >>> public
> > >> > > > >>> > > API
> > >> > > > >>> > > > afaik)? Why not, for example, expose it on the
> Consumer,
> > >> > which
> > >> > > is
> > >> > > > >>> > > > presumably where you'd want access to it since the
> > >> > > functionality
> > >> > > > >>> > depends
> > >> > > > >>> > > on
> > >> > > > >>> > > > the consumer actually having consumed the data?
> > >> > > > >>> > > >
> > >> > > > >>> > > > -Ewen
> > >> > > > >>> > > >
> > >> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> > >> > lindong28@gmail.com>
> > >> > > > >>> wrote:
> > >> > > > >>> > > >
> > >> > > > >>> > > > > Hi all,
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > We created KIP-107 to propose addition of
> > >> purgeDataBefore()
> > >> > > API
> > >> > > > >>> in
> > >> > > > >>> > > > > AdminClient.
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > Please find the KIP wiki in the link
> > >> > > > https://iwww.corp.linkedin.
> > >> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka
> > >> +purgeDataBefore%28%29+API+
> > >> > > > >>> > > > design+proposal.
> > >> > > > >>> > > > > We
> > >> > > > >>> > > > > would love to hear your comments and suggestions.
> > >> > > > >>> > > > >
> > >> > > > >>> > > > > Thanks,
> > >> > > > >>> > > > > Dong
> > >> > > > >>> > > > >
> > >> > > > >>> > > >
> > >> > > > >>> > >
> > >> > > > >>> >
> > >> > > > >>>
> > >> > > > >>
> > >> > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -Regards,
> > >> Mayuresh R. Gharat
> > >> (862) 250-7125
> > >>
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Mayuresh Gharat <gh...@gmail.com>.
Hi Dong,

As per  "If the message's offset is below low_watermark,
then it should have been deleted by log retention policy."
---> I am not sure if  I understand this correctly. Do you mean to say that
the low_watermark will be updated only when the log retention fires on the
broker?

Thanks,

Mayuresh

On Tue, Jan 10, 2017 at 2:56 PM, Dong Lin <li...@gmail.com> wrote:

> Bump up. I am going to initiate the vote If there is no further concern
> with the KIP.
>
> On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <li...@gmail.com> wrote:
>
> > Hey Mayuresh,
> >
> > Thanks for the comment. If the message's offset is below low_watermark,
> > then it should have been deleted by log retention policy. Thus it is OK
> not
> > to expose this message to consumer. Does this answer your question?
> >
> > Thanks,
> > Dong
> >
> > On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <
> > gharatmayuresh15@gmail.com> wrote:
> >
> >> Hi Dong,
> >>
> >> Thanks for the KIP.
> >>
> >> I had a question (which might have been answered before).
> >>
> >> 1) The KIP says that the low_water_mark will be updated periodically by
> >> the
> >> broker like high_water_mark.
> >> Essentially we want to use low_water_mark for cases where an entire
> >> segment
> >> cannot be deleted because may be the segment_start_offset < PurgeOffset
> <
> >> segment_end_offset, in which case we will set the low_water_mark to
> >> PurgeOffset+1.
> >>
> >> 2) The KIP also says that messages below low_water_mark will not be
> >> exposed
> >> for consumers, which does make sense since we want say that data below
> >> low_water_mark is purged.
> >>
> >> Looking at above conditions, does it make sense not to update the
> >> low_water_mark periodically but only on PurgeRequest?
> >> The reason being, if we update it periodically then as per 2) we will
> not
> >> be allowing consumers to re-consume data that is not purged but is below
> >> low_water_mark.
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >>
> >> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <li...@gmail.com> wrote:
> >>
> >> > Hey Jun,
> >> >
> >> > Thanks for reviewing the KIP!
> >> >
> >> > 1. The low_watermark will be checkpointed in a new file named
> >> >  "replication-low-watermark-checkpoint". It will have the same format
> >> as
> >> > the existing replication-offset-checkpoint file. This allows us the
> keep
> >> > the existing format of checkpoint files which maps TopicPartition to
> >> Long.
> >> > I just updated the "Public Interface" section in the KIP wiki to
> explain
> >> > this file.
> >> >
> >> > 2. I think using low_watermark from leader to trigger log retention in
> >> the
> >> > follower will work correctly in the sense that all messages with
> offset
> >> <
> >> > low_watermark can be deleted. But I am not sure that the efficiency is
> >> the
> >> > same, i.e. offset of messages which should be deleted (i.e. due to
> time
> >> or
> >> > size-based log retention policy) will be smaller than low_watermark
> from
> >> > the leader.
> >> >
> >> > For example, say both the follower and the leader have messages with
> >> > offsets in range [0, 2000]. If the follower does log rolling slightly
> >> later
> >> > than leader, the segments on follower would be [0, 1001], [1002, 2000]
> >> and
> >> > segments on leader would be [0, 1000], [1001, 2000]. After leader
> >> deletes
> >> > the first segment, the low_watermark would be 1001. Thus the first
> >> segment
> >> > would stay on follower's disk unnecessarily which may double disk
> usage
> >> at
> >> > worst.
> >> >
> >> > Since this approach doesn't save us much, I am inclined to not include
> >> this
> >> > change to keep the KIP simple.
> >> >
> >> > Dong
> >> >
> >> >
> >> >
> >> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:
> >> >
> >> > > Hi, Dong,
> >> > >
> >> > > Thanks for the proposal. Looks good overall. A couple of comments.
> >> > >
> >> > > 1. Where is the low_watermark checkpointed? Is that
> >> > > in replication-offset-checkpoint? If so, do we need to bump up the
> >> > version?
> >> > > Could you also describe the format change?
> >> > >
> >> > > 2. For topics with "delete" retention, currently we let each replica
> >> > delete
> >> > > old segments independently. With low_watermark, we could just let
> >> leaders
> >> > > delete old segments through the deletion policy and the followers
> will
> >> > > simply delete old segments based on low_watermark. Not sure if this
> >> saves
> >> > > much, but is a potential option that may be worth thinking about.
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > one more example of complicated config - mirror maker.
> >> > > >
> >> > > > we definitely cant trust each and every topic owner to configure
> >> their
> >> > > > topics not to purge before they've been mirrored.
> >> > > > which would mean there's a per-topic config (set by the owner)
> and a
> >> > > > "global" config (where mirror makers are specified) and they need
> >> to be
> >> > > > "merged".
> >> > > > for those topics that _are_ mirrored.
> >> > > > which is a changing set of topics thats stored in an external
> system
> >> > > > outside of kafka.
> >> > > > if a topic is taken out of the mirror set the MM offset would be
> >> > "frozen"
> >> > > > at that point and prevent clean-up for all eternity, unless its
> >> > > cleaned-up
> >> > > > itself.
> >> > > >
> >> > > > ...
> >> > > >
> >> > > > complexity :-)
> >> > > >
> >> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenblatt@gmail.com
> >
> >> > > wrote:
> >> > > >
> >> > > > > in summary - i'm not opposed to the idea of a per-topic clean up
> >> > config
> >> > > > > that tracks some set of consumer groups' offsets (which would
> >> > probably
> >> > > > work
> >> > > > > for 80% of use cases), but i definitely see a need to expose a
> >> simple
> >> > > API
> >> > > > > for the more advanced/obscure/custom use cases (the other 20%).
> >> > > > >
> >> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <
> radai.rosenblatt@gmail.com
> >> >
> >> > > > wrote:
> >> > > > >
> >> > > > >> a major motivation for this KIP is cost savings.
> >> > > > >>
> >> > > > >> lots of internal systems at LI use kafka as an intermediate
> pipe,
> >> > and
> >> > > > set
> >> > > > >> the topic retention period to a "safe enough" amount of time to
> >> be
> >> > > able
> >> > > > to
> >> > > > >> recover from crashes/downtime and catch up to "now". this
> results
> >> > in a
> >> > > > few
> >> > > > >> days' worth of retention typically.
> >> > > > >>
> >> > > > >> however, under normal operating conditions the consumers are
> >> mostly
> >> > > > >> caught-up and so early clean-up enables a big cost savings in
> >> > storage.
> >> > > > >>
> >> > > > >> as for my points:
> >> > > > >>
> >> > > > >> 1. when discussing implementation options for automatic
> clean-up
> >> we
> >> > > > >> realized that cleaning up by keeping track of offsets stored in
> >> > kafka
> >> > > > >> requires some per-topic config - you need to specify which
> >> groups to
> >> > > > track.
> >> > > > >> this becomes a problem because:
> >> > > > >>     1.1 - relatively complicated code, to be written in the
> >> broker.
> >> > > > >>     1.2 - configuration needs to be maintained up to date by
> >> topic
> >> > > > >> "owners" - of which we have thousands. failure to do so would
> >> > decrease
> >> > > > the
> >> > > > >> cost benefit.
> >> > > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> >> > > "bootstrap"
> >> > > > >> workflow where they will reset their offsets to an earlier
> value
> >> > than
> >> > > > the
> >> > > > >> one stored. this means that a stored offset of X does not
> always
> >> > mean
> >> > > > you
> >> > > > >> can clean up to X-1. think of it as video encoding -some apps
> >> have
> >> > > "key
> >> > > > >> frames" they may seek back to which are before their current
> >> offset.
> >> > > > >>     1.4 - there are multiple possible strategies - you could
> >> clean
> >> > up
> >> > > > >> aggressively, retain some "time distance" from latest, some
> >> "offset
> >> > > > >> distance", etc. this we think would have made it very hard to
> >> agree
> >> > > on a
> >> > > > >> single "correct" implementation that everyone would be happy
> >> with.
> >> > it
> >> > > > would
> >> > > > >> be better to include the raw functionality in the API and leave
> >> the
> >> > > > >> "brains" to an external monitoring system where people could
> >> > > > custom-taylor
> >> > > > >> their logic
> >> > > > >>
> >> > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin
> >> up
> >> > > > >> console consumers and connect to a topic as a debug aid. SREs
> may
> >> > also
> >> > > > do
> >> > > > >> this. there are also various other eco-system applications that
> >> may
> >> > > > >> consumer from topics (unknown to topic owners as those are
> infra
> >> > > > monitoring
> >> > > > >> tools). obviously such consumer-groups' offsets should be
> ignored
> >> > for
> >> > > > >> purposes of clean-up, but coming up with a bullet-proof way to
> do
> >> > this
> >> > > > is
> >> > > > >> non-trivial and again ties with implementation complexity and
> >> > > > inflexibility
> >> > > > >> of a "one size fits all" solution in 1.4 above.
> >> > > > >>
> >> > > > >> 3. forceful clean-up: we have workflows that use kafka to move
> >> > > gigantic
> >> > > > >> blobs from offline hadoop processing flows into systems. the
> data
> >> > > being
> >> > > > >> "loaded" into such an online system can be several GBs in side
> >> and
> >> > > take
> >> > > > a
> >> > > > >> long time to consume (they are sliced into many small msgs).
> >> > sometimes
> >> > > > the
> >> > > > >> sender wants to abort and start a new blob before the current
> >> load
> >> > > > process
> >> > > > >> has completed - meaning the consumer's offsets are not yet
> caught
> >> > up.
> >> > > > >>
> >> > > > >> 4. offsets outside of kafka: yes, you could force applications
> to
> >> > > store
> >> > > > >> their offsets twice, but thats inefficient. its better to
> expose
> >> a
> >> > > raw,
> >> > > > >> simple API and let such applications manage their own clean-up
> >> logic
> >> > > > (this
> >> > > > >> again ties into 1.4 and no "one size fits all" solution)
> >> > > > >>
> >> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindong28@gmail.com
> >
> >> > > wrote:
> >> > > > >>
> >> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> >> > > > >>> ewen@confluent.io>
> >> > > > >>> wrote:
> >> > > > >>>
> >> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <
> lindong28@gmail.com
> >> >
> >> > > > wrote:
> >> > > > >>> >
> >> > > > >>> > > Hey Ewen,
> >> > > > >>> > >
> >> > > > >>> > > Thanks for the review. As Radai explained, it would be
> >> complex
> >> > in
> >> > > > >>> terms
> >> > > > >>> > of
> >> > > > >>> > > user configuration if we were to use committed offset to
> >> decide
> >> > > > data
> >> > > > >>> > > deletion. We need a way to specify which groups need to
> >> consume
> >> > > > data
> >> > > > >>> of
> >> > > > >>> > > this partition. The broker will also need to consume the
> >> entire
> >> > > > >>> offsets
> >> > > > >>> > > topic in that approach which has some overhead. I don't
> >> think
> >> > it
> >> > > is
> >> > > > >>> that
> >> > > > >>> > > hard to implement. But it will likely take more time to
> >> discuss
> >> > > > that
> >> > > > >>> > > approach due to the new config and the server side
> overhead.
> >> > > > >>> > >
> >> > > > >>> > > We choose to put this API in AdminClient because the API
> is
> >> > more
> >> > > > >>> like an
> >> > > > >>> > > administrative operation (such as listGroups,
> deleteTopics)
> >> > than
> >> > > a
> >> > > > >>> > consumer
> >> > > > >>> > > operation. It is not necessarily called by consumer only.
> >> For
> >> > > > >>> example, we
> >> > > > >>> > > can implement the "delete data before committed offset"
> >> > approach
> >> > > by
> >> > > > >>> > running
> >> > > > >>> > > an external service which calls purgeDataBefore() API
> based
> >> on
> >> > > > >>> committed
> >> > > > >>> > > offset of consumer groups.
> >> > > > >>> > >
> >> > > > >>> > > I am not aware that AdminClient is not a public API.
> >> Suppose it
> >> > > is
> >> > > > >>> not
> >> > > > >>> > > public now, I assume we plan to make it public in the
> >> future as
> >> > > > part
> >> > > > >>> of
> >> > > > >>> > > KIP-4. Are we not making it public because its interface
> is
> >> not
> >> > > > >>> stable?
> >> > > > >>> > If
> >> > > > >>> > > so, can we just tag this new API as not stable in the
> code?
> >> > > > >>> > >
> >> > > > >>> >
> >> > > > >>> >
> >> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> >> > > implementation.
> >> > > > >>> It's
> >> > > > >>> > definitely confusing that both will be (could be?) named
> >> > > AdminClient,
> >> > > > >>> but
> >> > > > >>> > we've kept the existing Scala AdminClient out of the public
> >> API
> >> > and
> >> > > > >>> have
> >> > > > >>> > not required KIPs for changes to it.
> >> > > > >>> >
> >> > > > >>> > That said, I agree that if this type of API makes it into
> >> Kafka,
> >> > > > >>> having a
> >> > > > >>> > (new, Java-based) AdminClient method would definitely be a
> >> good
> >> > > idea.
> >> > > > >>> An
> >> > > > >>> > alternative path might be to have a Consumer-based
> >> implementation
> >> > > > since
> >> > > > >>> > that seems like a very intuitive, natural way to use the
> >> > protocol.
> >> > > I
> >> > > > >>> think
> >> > > > >>> > optimizing for the expected use case would be a good idea.
> >> > > > >>> >
> >> > > > >>> > -Ewen
> >> > > > >>> >
> >> > > > >>> > Are you saying that the Scala AdminClient is not a public
> API
> >> and
> >> > > we
> >> > > > >>> discourage addition of any new feature to this class?
> >> > > > >>>
> >> > > > >>> I still prefer to add it to AdminClient (Java version in the
> >> future
> >> > > and
> >> > > > >>> Scala version in the short team) because I feel it belongs to
> >> admin
> >> > > > >>> operation instead of KafkaConsumer interface. For example, if
> in
> >> > the
> >> > > > >>> future
> >> > > > >>> we implement the "delete data before committed offset"
> strategy
> >> in
> >> > an
> >> > > > >>> external service, I feel it is a bit awkward if the service
> has
> >> to
> >> > > > >>> instantiate a KafkaConsumer and call
> >> KafkaConsumer.purgeDataBefore(
> >> > > > ...)
> >> > > > >>> to
> >> > > > >>> purge data. In other words, our expected use-case doesn't
> >> > necessarily
> >> > > > >>> bind
> >> > > > >>> this API with consumer.
> >> > > > >>>
> >> > > > >>> I am not strong on this issue. Let's see what other
> >> > > > committers/developers
> >> > > > >>> think about this.
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> >
> >> > > > >>> > >
> >> > > > >>> > > Thanks,
> >> > > > >>> > > Dong
> >> > > > >>> > >
> >> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> >> > > > >>> ewen@confluent.io
> >> > > > >>> > >
> >> > > > >>> > > wrote:
> >> > > > >>> > >
> >> > > > >>> > > > Dong,
> >> > > > >>> > > >
> >> > > > >>> > > > Looks like that's an internal link,
> >> > > > >>> > > > https://cwiki.apache.org/confl
> >> uence/display/KAFKA/KIP-107%
> >> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> >> > > > >>> > > > is the right one.
> >> > > > >>> > > >
> >> > > > >>> > > > I have a question about one of the rejected
> alternatives:
> >> > > > >>> > > >
> >> > > > >>> > > > > Using committed offset instead of an extra API to
> >> trigger
> >> > > data
> >> > > > >>> purge
> >> > > > >>> > > > operation.
> >> > > > >>> > > >
> >> > > > >>> > > > The KIP says this would be more complicated to
> implement.
> >> Why
> >> > > is
> >> > > > >>> that?
> >> > > > >>> > I
> >> > > > >>> > > > think brokers would have to consume the entire offsets
> >> topic,
> >> > > but
> >> > > > >>> the
> >> > > > >>> > > data
> >> > > > >>> > > > stored in memory doesn't seem to change and applying
> this
> >> > when
> >> > > > >>> updated
> >> > > > >>> > > > offsets are seen seems basically the same. It might also
> >> be
> >> > > > >>> possible to
> >> > > > >>> > > > make it work even with multiple consumer groups if that
> >> was
> >> > > > desired
> >> > > > >>> > > > (although that'd require tracking more data in memory)
> as
> >> a
> >> > > > >>> > > generalization
> >> > > > >>> > > > without requiring coordination between the consumer
> >> groups.
> >> > > Given
> >> > > > >>> the
> >> > > > >>> > > > motivation, I'm assuming this was considered unnecessary
> >> > since
> >> > > > this
> >> > > > >>> > > > specifically targets intermediate stream processing
> >> topics.
> >> > > > >>> > > >
> >> > > > >>> > > > Another question is why expose this via AdminClient
> (which
> >> > > isn't
> >> > > > >>> public
> >> > > > >>> > > API
> >> > > > >>> > > > afaik)? Why not, for example, expose it on the Consumer,
> >> > which
> >> > > is
> >> > > > >>> > > > presumably where you'd want access to it since the
> >> > > functionality
> >> > > > >>> > depends
> >> > > > >>> > > on
> >> > > > >>> > > > the consumer actually having consumed the data?
> >> > > > >>> > > >
> >> > > > >>> > > > -Ewen
> >> > > > >>> > > >
> >> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> >> > lindong28@gmail.com>
> >> > > > >>> wrote:
> >> > > > >>> > > >
> >> > > > >>> > > > > Hi all,
> >> > > > >>> > > > >
> >> > > > >>> > > > > We created KIP-107 to propose addition of
> >> purgeDataBefore()
> >> > > API
> >> > > > >>> in
> >> > > > >>> > > > > AdminClient.
> >> > > > >>> > > > >
> >> > > > >>> > > > > Please find the KIP wiki in the link
> >> > > > https://iwww.corp.linkedin.
> >> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka
> >> +purgeDataBefore%28%29+API+
> >> > > > >>> > > > design+proposal.
> >> > > > >>> > > > > We
> >> > > > >>> > > > > would love to hear your comments and suggestions.
> >> > > > >>> > > > >
> >> > > > >>> > > > > Thanks,
> >> > > > >>> > > > > Dong
> >> > > > >>> > > > >
> >> > > > >>> > > >
> >> > > > >>> > >
> >> > > > >>> >
> >> > > > >>>
> >> > > > >>
> >> > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >>
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
Bump up. I am going to initiate the vote If there is no further concern
with the KIP.

On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <li...@gmail.com> wrote:

> Hey Mayuresh,
>
> Thanks for the comment. If the message's offset is below low_watermark,
> then it should have been deleted by log retention policy. Thus it is OK not
> to expose this message to consumer. Does this answer your question?
>
> Thanks,
> Dong
>
> On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Thanks for the KIP.
>>
>> I had a question (which might have been answered before).
>>
>> 1) The KIP says that the low_water_mark will be updated periodically by
>> the
>> broker like high_water_mark.
>> Essentially we want to use low_water_mark for cases where an entire
>> segment
>> cannot be deleted because may be the segment_start_offset < PurgeOffset <
>> segment_end_offset, in which case we will set the low_water_mark to
>> PurgeOffset+1.
>>
>> 2) The KIP also says that messages below low_water_mark will not be
>> exposed
>> for consumers, which does make sense since we want say that data below
>> low_water_mark is purged.
>>
>> Looking at above conditions, does it make sense not to update the
>> low_water_mark periodically but only on PurgeRequest?
>> The reason being, if we update it periodically then as per 2) we will not
>> be allowing consumers to re-consume data that is not purged but is below
>> low_water_mark.
>>
>> Thanks,
>>
>> Mayuresh
>>
>>
>> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <li...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks for reviewing the KIP!
>> >
>> > 1. The low_watermark will be checkpointed in a new file named
>> >  "replication-low-watermark-checkpoint". It will have the same format
>> as
>> > the existing replication-offset-checkpoint file. This allows us the keep
>> > the existing format of checkpoint files which maps TopicPartition to
>> Long.
>> > I just updated the "Public Interface" section in the KIP wiki to explain
>> > this file.
>> >
>> > 2. I think using low_watermark from leader to trigger log retention in
>> the
>> > follower will work correctly in the sense that all messages with offset
>> <
>> > low_watermark can be deleted. But I am not sure that the efficiency is
>> the
>> > same, i.e. offset of messages which should be deleted (i.e. due to time
>> or
>> > size-based log retention policy) will be smaller than low_watermark from
>> > the leader.
>> >
>> > For example, say both the follower and the leader have messages with
>> > offsets in range [0, 2000]. If the follower does log rolling slightly
>> later
>> > than leader, the segments on follower would be [0, 1001], [1002, 2000]
>> and
>> > segments on leader would be [0, 1000], [1001, 2000]. After leader
>> deletes
>> > the first segment, the low_watermark would be 1001. Thus the first
>> segment
>> > would stay on follower's disk unnecessarily which may double disk usage
>> at
>> > worst.
>> >
>> > Since this approach doesn't save us much, I am inclined to not include
>> this
>> > change to keep the KIP simple.
>> >
>> > Dong
>> >
>> >
>> >
>> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the proposal. Looks good overall. A couple of comments.
>> > >
>> > > 1. Where is the low_watermark checkpointed? Is that
>> > > in replication-offset-checkpoint? If so, do we need to bump up the
>> > version?
>> > > Could you also describe the format change?
>> > >
>> > > 2. For topics with "delete" retention, currently we let each replica
>> > delete
>> > > old segments independently. With low_watermark, we could just let
>> leaders
>> > > delete old segments through the deletion policy and the followers will
>> > > simply delete old segments based on low_watermark. Not sure if this
>> saves
>> > > much, but is a potential option that may be worth thinking about.
>> > >
>> > > Jun
>> > >
>> > >
>> > >
>> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com>
>> > wrote:
>> > >
>> > > > one more example of complicated config - mirror maker.
>> > > >
>> > > > we definitely cant trust each and every topic owner to configure
>> their
>> > > > topics not to purge before they've been mirrored.
>> > > > which would mean there's a per-topic config (set by the owner) and a
>> > > > "global" config (where mirror makers are specified) and they need
>> to be
>> > > > "merged".
>> > > > for those topics that _are_ mirrored.
>> > > > which is a changing set of topics thats stored in an external system
>> > > > outside of kafka.
>> > > > if a topic is taken out of the mirror set the MM offset would be
>> > "frozen"
>> > > > at that point and prevent clean-up for all eternity, unless its
>> > > cleaned-up
>> > > > itself.
>> > > >
>> > > > ...
>> > > >
>> > > > complexity :-)
>> > > >
>> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > in summary - i'm not opposed to the idea of a per-topic clean up
>> > config
>> > > > > that tracks some set of consumer groups' offsets (which would
>> > probably
>> > > > work
>> > > > > for 80% of use cases), but i definitely see a need to expose a
>> simple
>> > > API
>> > > > > for the more advanced/obscure/custom use cases (the other 20%).
>> > > > >
>> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenblatt@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > >> a major motivation for this KIP is cost savings.
>> > > > >>
>> > > > >> lots of internal systems at LI use kafka as an intermediate pipe,
>> > and
>> > > > set
>> > > > >> the topic retention period to a "safe enough" amount of time to
>> be
>> > > able
>> > > > to
>> > > > >> recover from crashes/downtime and catch up to "now". this results
>> > in a
>> > > > few
>> > > > >> days' worth of retention typically.
>> > > > >>
>> > > > >> however, under normal operating conditions the consumers are
>> mostly
>> > > > >> caught-up and so early clean-up enables a big cost savings in
>> > storage.
>> > > > >>
>> > > > >> as for my points:
>> > > > >>
>> > > > >> 1. when discussing implementation options for automatic clean-up
>> we
>> > > > >> realized that cleaning up by keeping track of offsets stored in
>> > kafka
>> > > > >> requires some per-topic config - you need to specify which
>> groups to
>> > > > track.
>> > > > >> this becomes a problem because:
>> > > > >>     1.1 - relatively complicated code, to be written in the
>> broker.
>> > > > >>     1.2 - configuration needs to be maintained up to date by
>> topic
>> > > > >> "owners" - of which we have thousands. failure to do so would
>> > decrease
>> > > > the
>> > > > >> cost benefit.
>> > > > >>     1.3 - some applications have a "reconsume" / "reinit" /
>> > > "bootstrap"
>> > > > >> workflow where they will reset their offsets to an earlier value
>> > than
>> > > > the
>> > > > >> one stored. this means that a stored offset of X does not always
>> > mean
>> > > > you
>> > > > >> can clean up to X-1. think of it as video encoding -some apps
>> have
>> > > "key
>> > > > >> frames" they may seek back to which are before their current
>> offset.
>> > > > >>     1.4 - there are multiple possible strategies - you could
>> clean
>> > up
>> > > > >> aggressively, retain some "time distance" from latest, some
>> "offset
>> > > > >> distance", etc. this we think would have made it very hard to
>> agree
>> > > on a
>> > > > >> single "correct" implementation that everyone would be happy
>> with.
>> > it
>> > > > would
>> > > > >> be better to include the raw functionality in the API and leave
>> the
>> > > > >> "brains" to an external monitoring system where people could
>> > > > custom-taylor
>> > > > >> their logic
>> > > > >>
>> > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin
>> up
>> > > > >> console consumers and connect to a topic as a debug aid. SREs may
>> > also
>> > > > do
>> > > > >> this. there are also various other eco-system applications that
>> may
>> > > > >> consumer from topics (unknown to topic owners as those are infra
>> > > > monitoring
>> > > > >> tools). obviously such consumer-groups' offsets should be ignored
>> > for
>> > > > >> purposes of clean-up, but coming up with a bullet-proof way to do
>> > this
>> > > > is
>> > > > >> non-trivial and again ties with implementation complexity and
>> > > > inflexibility
>> > > > >> of a "one size fits all" solution in 1.4 above.
>> > > > >>
>> > > > >> 3. forceful clean-up: we have workflows that use kafka to move
>> > > gigantic
>> > > > >> blobs from offline hadoop processing flows into systems. the data
>> > > being
>> > > > >> "loaded" into such an online system can be several GBs in side
>> and
>> > > take
>> > > > a
>> > > > >> long time to consume (they are sliced into many small msgs).
>> > sometimes
>> > > > the
>> > > > >> sender wants to abort and start a new blob before the current
>> load
>> > > > process
>> > > > >> has completed - meaning the consumer's offsets are not yet caught
>> > up.
>> > > > >>
>> > > > >> 4. offsets outside of kafka: yes, you could force applications to
>> > > store
>> > > > >> their offsets twice, but thats inefficient. its better to expose
>> a
>> > > raw,
>> > > > >> simple API and let such applications manage their own clean-up
>> logic
>> > > > (this
>> > > > >> again ties into 1.4 and no "one size fits all" solution)
>> > > > >>
>> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com>
>> > > wrote:
>> > > > >>
>> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
>> > > > >>> ewen@confluent.io>
>> > > > >>> wrote:
>> > > > >>>
>> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindong28@gmail.com
>> >
>> > > > wrote:
>> > > > >>> >
>> > > > >>> > > Hey Ewen,
>> > > > >>> > >
>> > > > >>> > > Thanks for the review. As Radai explained, it would be
>> complex
>> > in
>> > > > >>> terms
>> > > > >>> > of
>> > > > >>> > > user configuration if we were to use committed offset to
>> decide
>> > > > data
>> > > > >>> > > deletion. We need a way to specify which groups need to
>> consume
>> > > > data
>> > > > >>> of
>> > > > >>> > > this partition. The broker will also need to consume the
>> entire
>> > > > >>> offsets
>> > > > >>> > > topic in that approach which has some overhead. I don't
>> think
>> > it
>> > > is
>> > > > >>> that
>> > > > >>> > > hard to implement. But it will likely take more time to
>> discuss
>> > > > that
>> > > > >>> > > approach due to the new config and the server side overhead.
>> > > > >>> > >
>> > > > >>> > > We choose to put this API in AdminClient because the API is
>> > more
>> > > > >>> like an
>> > > > >>> > > administrative operation (such as listGroups, deleteTopics)
>> > than
>> > > a
>> > > > >>> > consumer
>> > > > >>> > > operation. It is not necessarily called by consumer only.
>> For
>> > > > >>> example, we
>> > > > >>> > > can implement the "delete data before committed offset"
>> > approach
>> > > by
>> > > > >>> > running
>> > > > >>> > > an external service which calls purgeDataBefore() API based
>> on
>> > > > >>> committed
>> > > > >>> > > offset of consumer groups.
>> > > > >>> > >
>> > > > >>> > > I am not aware that AdminClient is not a public API.
>> Suppose it
>> > > is
>> > > > >>> not
>> > > > >>> > > public now, I assume we plan to make it public in the
>> future as
>> > > > part
>> > > > >>> of
>> > > > >>> > > KIP-4. Are we not making it public because its interface is
>> not
>> > > > >>> stable?
>> > > > >>> > If
>> > > > >>> > > so, can we just tag this new API as not stable in the code?
>> > > > >>> > >
>> > > > >>> >
>> > > > >>> >
>> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
>> > > implementation.
>> > > > >>> It's
>> > > > >>> > definitely confusing that both will be (could be?) named
>> > > AdminClient,
>> > > > >>> but
>> > > > >>> > we've kept the existing Scala AdminClient out of the public
>> API
>> > and
>> > > > >>> have
>> > > > >>> > not required KIPs for changes to it.
>> > > > >>> >
>> > > > >>> > That said, I agree that if this type of API makes it into
>> Kafka,
>> > > > >>> having a
>> > > > >>> > (new, Java-based) AdminClient method would definitely be a
>> good
>> > > idea.
>> > > > >>> An
>> > > > >>> > alternative path might be to have a Consumer-based
>> implementation
>> > > > since
>> > > > >>> > that seems like a very intuitive, natural way to use the
>> > protocol.
>> > > I
>> > > > >>> think
>> > > > >>> > optimizing for the expected use case would be a good idea.
>> > > > >>> >
>> > > > >>> > -Ewen
>> > > > >>> >
>> > > > >>> > Are you saying that the Scala AdminClient is not a public API
>> and
>> > > we
>> > > > >>> discourage addition of any new feature to this class?
>> > > > >>>
>> > > > >>> I still prefer to add it to AdminClient (Java version in the
>> future
>> > > and
>> > > > >>> Scala version in the short team) because I feel it belongs to
>> admin
>> > > > >>> operation instead of KafkaConsumer interface. For example, if in
>> > the
>> > > > >>> future
>> > > > >>> we implement the "delete data before committed offset" strategy
>> in
>> > an
>> > > > >>> external service, I feel it is a bit awkward if the service has
>> to
>> > > > >>> instantiate a KafkaConsumer and call
>> KafkaConsumer.purgeDataBefore(
>> > > > ...)
>> > > > >>> to
>> > > > >>> purge data. In other words, our expected use-case doesn't
>> > necessarily
>> > > > >>> bind
>> > > > >>> this API with consumer.
>> > > > >>>
>> > > > >>> I am not strong on this issue. Let's see what other
>> > > > committers/developers
>> > > > >>> think about this.
>> > > > >>>
>> > > > >>>
>> > > > >>> >
>> > > > >>> > >
>> > > > >>> > > Thanks,
>> > > > >>> > > Dong
>> > > > >>> > >
>> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
>> > > > >>> ewen@confluent.io
>> > > > >>> > >
>> > > > >>> > > wrote:
>> > > > >>> > >
>> > > > >>> > > > Dong,
>> > > > >>> > > >
>> > > > >>> > > > Looks like that's an internal link,
>> > > > >>> > > > https://cwiki.apache.org/confl
>> uence/display/KAFKA/KIP-107%
>> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>> > > > >>> > > > is the right one.
>> > > > >>> > > >
>> > > > >>> > > > I have a question about one of the rejected alternatives:
>> > > > >>> > > >
>> > > > >>> > > > > Using committed offset instead of an extra API to
>> trigger
>> > > data
>> > > > >>> purge
>> > > > >>> > > > operation.
>> > > > >>> > > >
>> > > > >>> > > > The KIP says this would be more complicated to implement.
>> Why
>> > > is
>> > > > >>> that?
>> > > > >>> > I
>> > > > >>> > > > think brokers would have to consume the entire offsets
>> topic,
>> > > but
>> > > > >>> the
>> > > > >>> > > data
>> > > > >>> > > > stored in memory doesn't seem to change and applying this
>> > when
>> > > > >>> updated
>> > > > >>> > > > offsets are seen seems basically the same. It might also
>> be
>> > > > >>> possible to
>> > > > >>> > > > make it work even with multiple consumer groups if that
>> was
>> > > > desired
>> > > > >>> > > > (although that'd require tracking more data in memory) as
>> a
>> > > > >>> > > generalization
>> > > > >>> > > > without requiring coordination between the consumer
>> groups.
>> > > Given
>> > > > >>> the
>> > > > >>> > > > motivation, I'm assuming this was considered unnecessary
>> > since
>> > > > this
>> > > > >>> > > > specifically targets intermediate stream processing
>> topics.
>> > > > >>> > > >
>> > > > >>> > > > Another question is why expose this via AdminClient (which
>> > > isn't
>> > > > >>> public
>> > > > >>> > > API
>> > > > >>> > > > afaik)? Why not, for example, expose it on the Consumer,
>> > which
>> > > is
>> > > > >>> > > > presumably where you'd want access to it since the
>> > > functionality
>> > > > >>> > depends
>> > > > >>> > > on
>> > > > >>> > > > the consumer actually having consumed the data?
>> > > > >>> > > >
>> > > > >>> > > > -Ewen
>> > > > >>> > > >
>> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
>> > lindong28@gmail.com>
>> > > > >>> wrote:
>> > > > >>> > > >
>> > > > >>> > > > > Hi all,
>> > > > >>> > > > >
>> > > > >>> > > > > We created KIP-107 to propose addition of
>> purgeDataBefore()
>> > > API
>> > > > >>> in
>> > > > >>> > > > > AdminClient.
>> > > > >>> > > > >
>> > > > >>> > > > > Please find the KIP wiki in the link
>> > > > https://iwww.corp.linkedin.
>> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka
>> +purgeDataBefore%28%29+API+
>> > > > >>> > > > design+proposal.
>> > > > >>> > > > > We
>> > > > >>> > > > > would love to hear your comments and suggestions.
>> > > > >>> > > > >
>> > > > >>> > > > > Thanks,
>> > > > >>> > > > > Dong
>> > > > >>> > > > >
>> > > > >>> > > >
>> > > > >>> > >
>> > > > >>> >
>> > > > >>>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hey Mayuresh,

Thanks for the comment. If the message's offset is below low_watermark,
then it should have been deleted by log retention policy. Thus it is OK not
to expose this message to consumer. Does this answer your question?

Thanks,
Dong

On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <gh...@gmail.com>
wrote:

> Hi Dong,
>
> Thanks for the KIP.
>
> I had a question (which might have been answered before).
>
> 1) The KIP says that the low_water_mark will be updated periodically by the
> broker like high_water_mark.
> Essentially we want to use low_water_mark for cases where an entire segment
> cannot be deleted because may be the segment_start_offset < PurgeOffset <
> segment_end_offset, in which case we will set the low_water_mark to
> PurgeOffset+1.
>
> 2) The KIP also says that messages below low_water_mark will not be exposed
> for consumers, which does make sense since we want say that data below
> low_water_mark is purged.
>
> Looking at above conditions, does it make sense not to update the
> low_water_mark periodically but only on PurgeRequest?
> The reason being, if we update it periodically then as per 2) we will not
> be allowing consumers to re-consume data that is not purged but is below
> low_water_mark.
>
> Thanks,
>
> Mayuresh
>
>
> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <li...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks for reviewing the KIP!
> >
> > 1. The low_watermark will be checkpointed in a new file named
> >  "replication-low-watermark-checkpoint". It will have the same format as
> > the existing replication-offset-checkpoint file. This allows us the keep
> > the existing format of checkpoint files which maps TopicPartition to
> Long.
> > I just updated the "Public Interface" section in the KIP wiki to explain
> > this file.
> >
> > 2. I think using low_watermark from leader to trigger log retention in
> the
> > follower will work correctly in the sense that all messages with offset <
> > low_watermark can be deleted. But I am not sure that the efficiency is
> the
> > same, i.e. offset of messages which should be deleted (i.e. due to time
> or
> > size-based log retention policy) will be smaller than low_watermark from
> > the leader.
> >
> > For example, say both the follower and the leader have messages with
> > offsets in range [0, 2000]. If the follower does log rolling slightly
> later
> > than leader, the segments on follower would be [0, 1001], [1002, 2000]
> and
> > segments on leader would be [0, 1000], [1001, 2000]. After leader deletes
> > the first segment, the low_watermark would be 1001. Thus the first
> segment
> > would stay on follower's disk unnecessarily which may double disk usage
> at
> > worst.
> >
> > Since this approach doesn't save us much, I am inclined to not include
> this
> > change to keep the KIP simple.
> >
> > Dong
> >
> >
> >
> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the proposal. Looks good overall. A couple of comments.
> > >
> > > 1. Where is the low_watermark checkpointed? Is that
> > > in replication-offset-checkpoint? If so, do we need to bump up the
> > version?
> > > Could you also describe the format change?
> > >
> > > 2. For topics with "delete" retention, currently we let each replica
> > delete
> > > old segments independently. With low_watermark, we could just let
> leaders
> > > delete old segments through the deletion policy and the followers will
> > > simply delete old segments based on low_watermark. Not sure if this
> saves
> > > much, but is a potential option that may be worth thinking about.
> > >
> > > Jun
> > >
> > >
> > >
> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com>
> > wrote:
> > >
> > > > one more example of complicated config - mirror maker.
> > > >
> > > > we definitely cant trust each and every topic owner to configure
> their
> > > > topics not to purge before they've been mirrored.
> > > > which would mean there's a per-topic config (set by the owner) and a
> > > > "global" config (where mirror makers are specified) and they need to
> be
> > > > "merged".
> > > > for those topics that _are_ mirrored.
> > > > which is a changing set of topics thats stored in an external system
> > > > outside of kafka.
> > > > if a topic is taken out of the mirror set the MM offset would be
> > "frozen"
> > > > at that point and prevent clean-up for all eternity, unless its
> > > cleaned-up
> > > > itself.
> > > >
> > > > ...
> > > >
> > > > complexity :-)
> > > >
> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com>
> > > wrote:
> > > >
> > > > > in summary - i'm not opposed to the idea of a per-topic clean up
> > config
> > > > > that tracks some set of consumer groups' offsets (which would
> > probably
> > > > work
> > > > > for 80% of use cases), but i definitely see a need to expose a
> simple
> > > API
> > > > > for the more advanced/obscure/custom use cases (the other 20%).
> > > > >
> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> a major motivation for this KIP is cost savings.
> > > > >>
> > > > >> lots of internal systems at LI use kafka as an intermediate pipe,
> > and
> > > > set
> > > > >> the topic retention period to a "safe enough" amount of time to be
> > > able
> > > > to
> > > > >> recover from crashes/downtime and catch up to "now". this results
> > in a
> > > > few
> > > > >> days' worth of retention typically.
> > > > >>
> > > > >> however, under normal operating conditions the consumers are
> mostly
> > > > >> caught-up and so early clean-up enables a big cost savings in
> > storage.
> > > > >>
> > > > >> as for my points:
> > > > >>
> > > > >> 1. when discussing implementation options for automatic clean-up
> we
> > > > >> realized that cleaning up by keeping track of offsets stored in
> > kafka
> > > > >> requires some per-topic config - you need to specify which groups
> to
> > > > track.
> > > > >> this becomes a problem because:
> > > > >>     1.1 - relatively complicated code, to be written in the
> broker.
> > > > >>     1.2 - configuration needs to be maintained up to date by topic
> > > > >> "owners" - of which we have thousands. failure to do so would
> > decrease
> > > > the
> > > > >> cost benefit.
> > > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> > > "bootstrap"
> > > > >> workflow where they will reset their offsets to an earlier value
> > than
> > > > the
> > > > >> one stored. this means that a stored offset of X does not always
> > mean
> > > > you
> > > > >> can clean up to X-1. think of it as video encoding -some apps have
> > > "key
> > > > >> frames" they may seek back to which are before their current
> offset.
> > > > >>     1.4 - there are multiple possible strategies - you could clean
> > up
> > > > >> aggressively, retain some "time distance" from latest, some
> "offset
> > > > >> distance", etc. this we think would have made it very hard to
> agree
> > > on a
> > > > >> single "correct" implementation that everyone would be happy with.
> > it
> > > > would
> > > > >> be better to include the raw functionality in the API and leave
> the
> > > > >> "brains" to an external monitoring system where people could
> > > > custom-taylor
> > > > >> their logic
> > > > >>
> > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> > > > >> console consumers and connect to a topic as a debug aid. SREs may
> > also
> > > > do
> > > > >> this. there are also various other eco-system applications that
> may
> > > > >> consumer from topics (unknown to topic owners as those are infra
> > > > monitoring
> > > > >> tools). obviously such consumer-groups' offsets should be ignored
> > for
> > > > >> purposes of clean-up, but coming up with a bullet-proof way to do
> > this
> > > > is
> > > > >> non-trivial and again ties with implementation complexity and
> > > > inflexibility
> > > > >> of a "one size fits all" solution in 1.4 above.
> > > > >>
> > > > >> 3. forceful clean-up: we have workflows that use kafka to move
> > > gigantic
> > > > >> blobs from offline hadoop processing flows into systems. the data
> > > being
> > > > >> "loaded" into such an online system can be several GBs in side and
> > > take
> > > > a
> > > > >> long time to consume (they are sliced into many small msgs).
> > sometimes
> > > > the
> > > > >> sender wants to abort and start a new blob before the current load
> > > > process
> > > > >> has completed - meaning the consumer's offsets are not yet caught
> > up.
> > > > >>
> > > > >> 4. offsets outside of kafka: yes, you could force applications to
> > > store
> > > > >> their offsets twice, but thats inefficient. its better to expose a
> > > raw,
> > > > >> simple API and let such applications manage their own clean-up
> logic
> > > > (this
> > > > >> again ties into 1.4 and no "one size fits all" solution)
> > > > >>
> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com>
> > > wrote:
> > > > >>
> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > > > >>> ewen@confluent.io>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com>
> > > > wrote:
> > > > >>> >
> > > > >>> > > Hey Ewen,
> > > > >>> > >
> > > > >>> > > Thanks for the review. As Radai explained, it would be
> complex
> > in
> > > > >>> terms
> > > > >>> > of
> > > > >>> > > user configuration if we were to use committed offset to
> decide
> > > > data
> > > > >>> > > deletion. We need a way to specify which groups need to
> consume
> > > > data
> > > > >>> of
> > > > >>> > > this partition. The broker will also need to consume the
> entire
> > > > >>> offsets
> > > > >>> > > topic in that approach which has some overhead. I don't think
> > it
> > > is
> > > > >>> that
> > > > >>> > > hard to implement. But it will likely take more time to
> discuss
> > > > that
> > > > >>> > > approach due to the new config and the server side overhead.
> > > > >>> > >
> > > > >>> > > We choose to put this API in AdminClient because the API is
> > more
> > > > >>> like an
> > > > >>> > > administrative operation (such as listGroups, deleteTopics)
> > than
> > > a
> > > > >>> > consumer
> > > > >>> > > operation. It is not necessarily called by consumer only. For
> > > > >>> example, we
> > > > >>> > > can implement the "delete data before committed offset"
> > approach
> > > by
> > > > >>> > running
> > > > >>> > > an external service which calls purgeDataBefore() API based
> on
> > > > >>> committed
> > > > >>> > > offset of consumer groups.
> > > > >>> > >
> > > > >>> > > I am not aware that AdminClient is not a public API. Suppose
> it
> > > is
> > > > >>> not
> > > > >>> > > public now, I assume we plan to make it public in the future
> as
> > > > part
> > > > >>> of
> > > > >>> > > KIP-4. Are we not making it public because its interface is
> not
> > > > >>> stable?
> > > > >>> > If
> > > > >>> > > so, can we just tag this new API as not stable in the code?
> > > > >>> > >
> > > > >>> >
> > > > >>> >
> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> > > implementation.
> > > > >>> It's
> > > > >>> > definitely confusing that both will be (could be?) named
> > > AdminClient,
> > > > >>> but
> > > > >>> > we've kept the existing Scala AdminClient out of the public API
> > and
> > > > >>> have
> > > > >>> > not required KIPs for changes to it.
> > > > >>> >
> > > > >>> > That said, I agree that if this type of API makes it into
> Kafka,
> > > > >>> having a
> > > > >>> > (new, Java-based) AdminClient method would definitely be a good
> > > idea.
> > > > >>> An
> > > > >>> > alternative path might be to have a Consumer-based
> implementation
> > > > since
> > > > >>> > that seems like a very intuitive, natural way to use the
> > protocol.
> > > I
> > > > >>> think
> > > > >>> > optimizing for the expected use case would be a good idea.
> > > > >>> >
> > > > >>> > -Ewen
> > > > >>> >
> > > > >>> > Are you saying that the Scala AdminClient is not a public API
> and
> > > we
> > > > >>> discourage addition of any new feature to this class?
> > > > >>>
> > > > >>> I still prefer to add it to AdminClient (Java version in the
> future
> > > and
> > > > >>> Scala version in the short team) because I feel it belongs to
> admin
> > > > >>> operation instead of KafkaConsumer interface. For example, if in
> > the
> > > > >>> future
> > > > >>> we implement the "delete data before committed offset" strategy
> in
> > an
> > > > >>> external service, I feel it is a bit awkward if the service has
> to
> > > > >>> instantiate a KafkaConsumer and call
> KafkaConsumer.purgeDataBefore(
> > > > ...)
> > > > >>> to
> > > > >>> purge data. In other words, our expected use-case doesn't
> > necessarily
> > > > >>> bind
> > > > >>> this API with consumer.
> > > > >>>
> > > > >>> I am not strong on this issue. Let's see what other
> > > > committers/developers
> > > > >>> think about this.
> > > > >>>
> > > > >>>
> > > > >>> >
> > > > >>> > >
> > > > >>> > > Thanks,
> > > > >>> > > Dong
> > > > >>> > >
> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> > > > >>> ewen@confluent.io
> > > > >>> > >
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > > > Dong,
> > > > >>> > > >
> > > > >>> > > > Looks like that's an internal link,
> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > > >>> > > > is the right one.
> > > > >>> > > >
> > > > >>> > > > I have a question about one of the rejected alternatives:
> > > > >>> > > >
> > > > >>> > > > > Using committed offset instead of an extra API to trigger
> > > data
> > > > >>> purge
> > > > >>> > > > operation.
> > > > >>> > > >
> > > > >>> > > > The KIP says this would be more complicated to implement.
> Why
> > > is
> > > > >>> that?
> > > > >>> > I
> > > > >>> > > > think brokers would have to consume the entire offsets
> topic,
> > > but
> > > > >>> the
> > > > >>> > > data
> > > > >>> > > > stored in memory doesn't seem to change and applying this
> > when
> > > > >>> updated
> > > > >>> > > > offsets are seen seems basically the same. It might also be
> > > > >>> possible to
> > > > >>> > > > make it work even with multiple consumer groups if that was
> > > > desired
> > > > >>> > > > (although that'd require tracking more data in memory) as a
> > > > >>> > > generalization
> > > > >>> > > > without requiring coordination between the consumer groups.
> > > Given
> > > > >>> the
> > > > >>> > > > motivation, I'm assuming this was considered unnecessary
> > since
> > > > this
> > > > >>> > > > specifically targets intermediate stream processing topics.
> > > > >>> > > >
> > > > >>> > > > Another question is why expose this via AdminClient (which
> > > isn't
> > > > >>> public
> > > > >>> > > API
> > > > >>> > > > afaik)? Why not, for example, expose it on the Consumer,
> > which
> > > is
> > > > >>> > > > presumably where you'd want access to it since the
> > > functionality
> > > > >>> > depends
> > > > >>> > > on
> > > > >>> > > > the consumer actually having consumed the data?
> > > > >>> > > >
> > > > >>> > > > -Ewen
> > > > >>> > > >
> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> > lindong28@gmail.com>
> > > > >>> wrote:
> > > > >>> > > >
> > > > >>> > > > > Hi all,
> > > > >>> > > > >
> > > > >>> > > > > We created KIP-107 to propose addition of
> purgeDataBefore()
> > > API
> > > > >>> in
> > > > >>> > > > > AdminClient.
> > > > >>> > > > >
> > > > >>> > > > > Please find the KIP wiki in the link
> > > > https://iwww.corp.linkedin.
> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+
> API+
> > > > >>> > > > design+proposal.
> > > > >>> > > > > We
> > > > >>> > > > > would love to hear your comments and suggestions.
> > > > >>> > > > >
> > > > >>> > > > > Thanks,
> > > > >>> > > > > Dong
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Mayuresh Gharat <gh...@gmail.com>.
Hi Dong,

Thanks for the KIP.

I had a question (which might have been answered before).

1) The KIP says that the low_water_mark will be updated periodically by the
broker like high_water_mark.
Essentially we want to use low_water_mark for cases where an entire segment
cannot be deleted because may be the segment_start_offset < PurgeOffset <
segment_end_offset, in which case we will set the low_water_mark to
PurgeOffset+1.

2) The KIP also says that messages below low_water_mark will not be exposed
for consumers, which does make sense since we want say that data below
low_water_mark is purged.

Looking at above conditions, does it make sense not to update the
low_water_mark periodically but only on PurgeRequest?
The reason being, if we update it periodically then as per 2) we will not
be allowing consumers to re-consume data that is not purged but is below
low_water_mark.

Thanks,

Mayuresh


On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <li...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for reviewing the KIP!
>
> 1. The low_watermark will be checkpointed in a new file named
>  "replication-low-watermark-checkpoint". It will have the same format as
> the existing replication-offset-checkpoint file. This allows us the keep
> the existing format of checkpoint files which maps TopicPartition to Long.
> I just updated the "Public Interface" section in the KIP wiki to explain
> this file.
>
> 2. I think using low_watermark from leader to trigger log retention in the
> follower will work correctly in the sense that all messages with offset <
> low_watermark can be deleted. But I am not sure that the efficiency is the
> same, i.e. offset of messages which should be deleted (i.e. due to time or
> size-based log retention policy) will be smaller than low_watermark from
> the leader.
>
> For example, say both the follower and the leader have messages with
> offsets in range [0, 2000]. If the follower does log rolling slightly later
> than leader, the segments on follower would be [0, 1001], [1002, 2000] and
> segments on leader would be [0, 1000], [1001, 2000]. After leader deletes
> the first segment, the low_watermark would be 1001. Thus the first segment
> would stay on follower's disk unnecessarily which may double disk usage at
> worst.
>
> Since this approach doesn't save us much, I am inclined to not include this
> change to keep the KIP simple.
>
> Dong
>
>
>
> On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the proposal. Looks good overall. A couple of comments.
> >
> > 1. Where is the low_watermark checkpointed? Is that
> > in replication-offset-checkpoint? If so, do we need to bump up the
> version?
> > Could you also describe the format change?
> >
> > 2. For topics with "delete" retention, currently we let each replica
> delete
> > old segments independently. With low_watermark, we could just let leaders
> > delete old segments through the deletion policy and the followers will
> > simply delete old segments based on low_watermark. Not sure if this saves
> > much, but is a potential option that may be worth thinking about.
> >
> > Jun
> >
> >
> >
> > On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com>
> wrote:
> >
> > > one more example of complicated config - mirror maker.
> > >
> > > we definitely cant trust each and every topic owner to configure their
> > > topics not to purge before they've been mirrored.
> > > which would mean there's a per-topic config (set by the owner) and a
> > > "global" config (where mirror makers are specified) and they need to be
> > > "merged".
> > > for those topics that _are_ mirrored.
> > > which is a changing set of topics thats stored in an external system
> > > outside of kafka.
> > > if a topic is taken out of the mirror set the MM offset would be
> "frozen"
> > > at that point and prevent clean-up for all eternity, unless its
> > cleaned-up
> > > itself.
> > >
> > > ...
> > >
> > > complexity :-)
> > >
> > > On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com>
> > wrote:
> > >
> > > > in summary - i'm not opposed to the idea of a per-topic clean up
> config
> > > > that tracks some set of consumer groups' offsets (which would
> probably
> > > work
> > > > for 80% of use cases), but i definitely see a need to expose a simple
> > API
> > > > for the more advanced/obscure/custom use cases (the other 20%).
> > > >
> > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com>
> > > wrote:
> > > >
> > > >> a major motivation for this KIP is cost savings.
> > > >>
> > > >> lots of internal systems at LI use kafka as an intermediate pipe,
> and
> > > set
> > > >> the topic retention period to a "safe enough" amount of time to be
> > able
> > > to
> > > >> recover from crashes/downtime and catch up to "now". this results
> in a
> > > few
> > > >> days' worth of retention typically.
> > > >>
> > > >> however, under normal operating conditions the consumers are mostly
> > > >> caught-up and so early clean-up enables a big cost savings in
> storage.
> > > >>
> > > >> as for my points:
> > > >>
> > > >> 1. when discussing implementation options for automatic clean-up we
> > > >> realized that cleaning up by keeping track of offsets stored in
> kafka
> > > >> requires some per-topic config - you need to specify which groups to
> > > track.
> > > >> this becomes a problem because:
> > > >>     1.1 - relatively complicated code, to be written in the broker.
> > > >>     1.2 - configuration needs to be maintained up to date by topic
> > > >> "owners" - of which we have thousands. failure to do so would
> decrease
> > > the
> > > >> cost benefit.
> > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> > "bootstrap"
> > > >> workflow where they will reset their offsets to an earlier value
> than
> > > the
> > > >> one stored. this means that a stored offset of X does not always
> mean
> > > you
> > > >> can clean up to X-1. think of it as video encoding -some apps have
> > "key
> > > >> frames" they may seek back to which are before their current offset.
> > > >>     1.4 - there are multiple possible strategies - you could clean
> up
> > > >> aggressively, retain some "time distance" from latest, some "offset
> > > >> distance", etc. this we think would have made it very hard to agree
> > on a
> > > >> single "correct" implementation that everyone would be happy with.
> it
> > > would
> > > >> be better to include the raw functionality in the API and leave the
> > > >> "brains" to an external monitoring system where people could
> > > custom-taylor
> > > >> their logic
> > > >>
> > > >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> > > >> console consumers and connect to a topic as a debug aid. SREs may
> also
> > > do
> > > >> this. there are also various other eco-system applications that may
> > > >> consumer from topics (unknown to topic owners as those are infra
> > > monitoring
> > > >> tools). obviously such consumer-groups' offsets should be ignored
> for
> > > >> purposes of clean-up, but coming up with a bullet-proof way to do
> this
> > > is
> > > >> non-trivial and again ties with implementation complexity and
> > > inflexibility
> > > >> of a "one size fits all" solution in 1.4 above.
> > > >>
> > > >> 3. forceful clean-up: we have workflows that use kafka to move
> > gigantic
> > > >> blobs from offline hadoop processing flows into systems. the data
> > being
> > > >> "loaded" into such an online system can be several GBs in side and
> > take
> > > a
> > > >> long time to consume (they are sliced into many small msgs).
> sometimes
> > > the
> > > >> sender wants to abort and start a new blob before the current load
> > > process
> > > >> has completed - meaning the consumer's offsets are not yet caught
> up.
> > > >>
> > > >> 4. offsets outside of kafka: yes, you could force applications to
> > store
> > > >> their offsets twice, but thats inefficient. its better to expose a
> > raw,
> > > >> simple API and let such applications manage their own clean-up logic
> > > (this
> > > >> again ties into 1.4 and no "one size fits all" solution)
> > > >>
> > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com>
> > wrote:
> > > >>
> > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > > >>> ewen@confluent.io>
> > > >>> wrote:
> > > >>>
> > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com>
> > > wrote:
> > > >>> >
> > > >>> > > Hey Ewen,
> > > >>> > >
> > > >>> > > Thanks for the review. As Radai explained, it would be complex
> in
> > > >>> terms
> > > >>> > of
> > > >>> > > user configuration if we were to use committed offset to decide
> > > data
> > > >>> > > deletion. We need a way to specify which groups need to consume
> > > data
> > > >>> of
> > > >>> > > this partition. The broker will also need to consume the entire
> > > >>> offsets
> > > >>> > > topic in that approach which has some overhead. I don't think
> it
> > is
> > > >>> that
> > > >>> > > hard to implement. But it will likely take more time to discuss
> > > that
> > > >>> > > approach due to the new config and the server side overhead.
> > > >>> > >
> > > >>> > > We choose to put this API in AdminClient because the API is
> more
> > > >>> like an
> > > >>> > > administrative operation (such as listGroups, deleteTopics)
> than
> > a
> > > >>> > consumer
> > > >>> > > operation. It is not necessarily called by consumer only. For
> > > >>> example, we
> > > >>> > > can implement the "delete data before committed offset"
> approach
> > by
> > > >>> > running
> > > >>> > > an external service which calls purgeDataBefore() API based on
> > > >>> committed
> > > >>> > > offset of consumer groups.
> > > >>> > >
> > > >>> > > I am not aware that AdminClient is not a public API. Suppose it
> > is
> > > >>> not
> > > >>> > > public now, I assume we plan to make it public in the future as
> > > part
> > > >>> of
> > > >>> > > KIP-4. Are we not making it public because its interface is not
> > > >>> stable?
> > > >>> > If
> > > >>> > > so, can we just tag this new API as not stable in the code?
> > > >>> > >
> > > >>> >
> > > >>> >
> > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> > implementation.
> > > >>> It's
> > > >>> > definitely confusing that both will be (could be?) named
> > AdminClient,
> > > >>> but
> > > >>> > we've kept the existing Scala AdminClient out of the public API
> and
> > > >>> have
> > > >>> > not required KIPs for changes to it.
> > > >>> >
> > > >>> > That said, I agree that if this type of API makes it into Kafka,
> > > >>> having a
> > > >>> > (new, Java-based) AdminClient method would definitely be a good
> > idea.
> > > >>> An
> > > >>> > alternative path might be to have a Consumer-based implementation
> > > since
> > > >>> > that seems like a very intuitive, natural way to use the
> protocol.
> > I
> > > >>> think
> > > >>> > optimizing for the expected use case would be a good idea.
> > > >>> >
> > > >>> > -Ewen
> > > >>> >
> > > >>> > Are you saying that the Scala AdminClient is not a public API and
> > we
> > > >>> discourage addition of any new feature to this class?
> > > >>>
> > > >>> I still prefer to add it to AdminClient (Java version in the future
> > and
> > > >>> Scala version in the short team) because I feel it belongs to admin
> > > >>> operation instead of KafkaConsumer interface. For example, if in
> the
> > > >>> future
> > > >>> we implement the "delete data before committed offset" strategy in
> an
> > > >>> external service, I feel it is a bit awkward if the service has to
> > > >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(
> > > ...)
> > > >>> to
> > > >>> purge data. In other words, our expected use-case doesn't
> necessarily
> > > >>> bind
> > > >>> this API with consumer.
> > > >>>
> > > >>> I am not strong on this issue. Let's see what other
> > > committers/developers
> > > >>> think about this.
> > > >>>
> > > >>>
> > > >>> >
> > > >>> > >
> > > >>> > > Thanks,
> > > >>> > > Dong
> > > >>> > >
> > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> > > >>> ewen@confluent.io
> > > >>> > >
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > Dong,
> > > >>> > > >
> > > >>> > > > Looks like that's an internal link,
> > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > >>> > > > is the right one.
> > > >>> > > >
> > > >>> > > > I have a question about one of the rejected alternatives:
> > > >>> > > >
> > > >>> > > > > Using committed offset instead of an extra API to trigger
> > data
> > > >>> purge
> > > >>> > > > operation.
> > > >>> > > >
> > > >>> > > > The KIP says this would be more complicated to implement. Why
> > is
> > > >>> that?
> > > >>> > I
> > > >>> > > > think brokers would have to consume the entire offsets topic,
> > but
> > > >>> the
> > > >>> > > data
> > > >>> > > > stored in memory doesn't seem to change and applying this
> when
> > > >>> updated
> > > >>> > > > offsets are seen seems basically the same. It might also be
> > > >>> possible to
> > > >>> > > > make it work even with multiple consumer groups if that was
> > > desired
> > > >>> > > > (although that'd require tracking more data in memory) as a
> > > >>> > > generalization
> > > >>> > > > without requiring coordination between the consumer groups.
> > Given
> > > >>> the
> > > >>> > > > motivation, I'm assuming this was considered unnecessary
> since
> > > this
> > > >>> > > > specifically targets intermediate stream processing topics.
> > > >>> > > >
> > > >>> > > > Another question is why expose this via AdminClient (which
> > isn't
> > > >>> public
> > > >>> > > API
> > > >>> > > > afaik)? Why not, for example, expose it on the Consumer,
> which
> > is
> > > >>> > > > presumably where you'd want access to it since the
> > functionality
> > > >>> > depends
> > > >>> > > on
> > > >>> > > > the consumer actually having consumed the data?
> > > >>> > > >
> > > >>> > > > -Ewen
> > > >>> > > >
> > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> lindong28@gmail.com>
> > > >>> wrote:
> > > >>> > > >
> > > >>> > > > > Hi all,
> > > >>> > > > >
> > > >>> > > > > We created KIP-107 to propose addition of purgeDataBefore()
> > API
> > > >>> in
> > > >>> > > > > AdminClient.
> > > >>> > > > >
> > > >>> > > > > Please find the KIP wiki in the link
> > > https://iwww.corp.linkedin.
> > > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > > >>> > > > design+proposal.
> > > >>> > > > > We
> > > >>> > > > > would love to hear your comments and suggestions.
> > > >>> > > > >
> > > >>> > > > > Thanks,
> > > >>> > > > > Dong
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hey Jun,

Thanks for reviewing the KIP!

1. The low_watermark will be checkpointed in a new file named
 "replication-low-watermark-checkpoint". It will have the same format as
the existing replication-offset-checkpoint file. This allows us the keep
the existing format of checkpoint files which maps TopicPartition to Long.
I just updated the "Public Interface" section in the KIP wiki to explain
this file.

2. I think using low_watermark from leader to trigger log retention in the
follower will work correctly in the sense that all messages with offset <
low_watermark can be deleted. But I am not sure that the efficiency is the
same, i.e. offset of messages which should be deleted (i.e. due to time or
size-based log retention policy) will be smaller than low_watermark from
the leader.

For example, say both the follower and the leader have messages with
offsets in range [0, 2000]. If the follower does log rolling slightly later
than leader, the segments on follower would be [0, 1001], [1002, 2000] and
segments on leader would be [0, 1000], [1001, 2000]. After leader deletes
the first segment, the low_watermark would be 1001. Thus the first segment
would stay on follower's disk unnecessarily which may double disk usage at
worst.

Since this approach doesn't save us much, I am inclined to not include this
change to keep the KIP simple.

Dong



On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the proposal. Looks good overall. A couple of comments.
>
> 1. Where is the low_watermark checkpointed? Is that
> in replication-offset-checkpoint? If so, do we need to bump up the version?
> Could you also describe the format change?
>
> 2. For topics with "delete" retention, currently we let each replica delete
> old segments independently. With low_watermark, we could just let leaders
> delete old segments through the deletion policy and the followers will
> simply delete old segments based on low_watermark. Not sure if this saves
> much, but is a potential option that may be worth thinking about.
>
> Jun
>
>
>
> On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com> wrote:
>
> > one more example of complicated config - mirror maker.
> >
> > we definitely cant trust each and every topic owner to configure their
> > topics not to purge before they've been mirrored.
> > which would mean there's a per-topic config (set by the owner) and a
> > "global" config (where mirror makers are specified) and they need to be
> > "merged".
> > for those topics that _are_ mirrored.
> > which is a changing set of topics thats stored in an external system
> > outside of kafka.
> > if a topic is taken out of the mirror set the MM offset would be "frozen"
> > at that point and prevent clean-up for all eternity, unless its
> cleaned-up
> > itself.
> >
> > ...
> >
> > complexity :-)
> >
> > On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com>
> wrote:
> >
> > > in summary - i'm not opposed to the idea of a per-topic clean up config
> > > that tracks some set of consumer groups' offsets (which would probably
> > work
> > > for 80% of use cases), but i definitely see a need to expose a simple
> API
> > > for the more advanced/obscure/custom use cases (the other 20%).
> > >
> > > On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com>
> > wrote:
> > >
> > >> a major motivation for this KIP is cost savings.
> > >>
> > >> lots of internal systems at LI use kafka as an intermediate pipe, and
> > set
> > >> the topic retention period to a "safe enough" amount of time to be
> able
> > to
> > >> recover from crashes/downtime and catch up to "now". this results in a
> > few
> > >> days' worth of retention typically.
> > >>
> > >> however, under normal operating conditions the consumers are mostly
> > >> caught-up and so early clean-up enables a big cost savings in storage.
> > >>
> > >> as for my points:
> > >>
> > >> 1. when discussing implementation options for automatic clean-up we
> > >> realized that cleaning up by keeping track of offsets stored in kafka
> > >> requires some per-topic config - you need to specify which groups to
> > track.
> > >> this becomes a problem because:
> > >>     1.1 - relatively complicated code, to be written in the broker.
> > >>     1.2 - configuration needs to be maintained up to date by topic
> > >> "owners" - of which we have thousands. failure to do so would decrease
> > the
> > >> cost benefit.
> > >>     1.3 - some applications have a "reconsume" / "reinit" /
> "bootstrap"
> > >> workflow where they will reset their offsets to an earlier value than
> > the
> > >> one stored. this means that a stored offset of X does not always mean
> > you
> > >> can clean up to X-1. think of it as video encoding -some apps have
> "key
> > >> frames" they may seek back to which are before their current offset.
> > >>     1.4 - there are multiple possible strategies - you could clean up
> > >> aggressively, retain some "time distance" from latest, some "offset
> > >> distance", etc. this we think would have made it very hard to agree
> on a
> > >> single "correct" implementation that everyone would be happy with. it
> > would
> > >> be better to include the raw functionality in the API and leave the
> > >> "brains" to an external monitoring system where people could
> > custom-taylor
> > >> their logic
> > >>
> > >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> > >> console consumers and connect to a topic as a debug aid. SREs may also
> > do
> > >> this. there are also various other eco-system applications that may
> > >> consumer from topics (unknown to topic owners as those are infra
> > monitoring
> > >> tools). obviously such consumer-groups' offsets should be ignored for
> > >> purposes of clean-up, but coming up with a bullet-proof way to do this
> > is
> > >> non-trivial and again ties with implementation complexity and
> > inflexibility
> > >> of a "one size fits all" solution in 1.4 above.
> > >>
> > >> 3. forceful clean-up: we have workflows that use kafka to move
> gigantic
> > >> blobs from offline hadoop processing flows into systems. the data
> being
> > >> "loaded" into such an online system can be several GBs in side and
> take
> > a
> > >> long time to consume (they are sliced into many small msgs). sometimes
> > the
> > >> sender wants to abort and start a new blob before the current load
> > process
> > >> has completed - meaning the consumer's offsets are not yet caught up.
> > >>
> > >> 4. offsets outside of kafka: yes, you could force applications to
> store
> > >> their offsets twice, but thats inefficient. its better to expose a
> raw,
> > >> simple API and let such applications manage their own clean-up logic
> > (this
> > >> again ties into 1.4 and no "one size fits all" solution)
> > >>
> > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com>
> wrote:
> > >>
> > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > >>> ewen@confluent.io>
> > >>> wrote:
> > >>>
> > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com>
> > wrote:
> > >>> >
> > >>> > > Hey Ewen,
> > >>> > >
> > >>> > > Thanks for the review. As Radai explained, it would be complex in
> > >>> terms
> > >>> > of
> > >>> > > user configuration if we were to use committed offset to decide
> > data
> > >>> > > deletion. We need a way to specify which groups need to consume
> > data
> > >>> of
> > >>> > > this partition. The broker will also need to consume the entire
> > >>> offsets
> > >>> > > topic in that approach which has some overhead. I don't think it
> is
> > >>> that
> > >>> > > hard to implement. But it will likely take more time to discuss
> > that
> > >>> > > approach due to the new config and the server side overhead.
> > >>> > >
> > >>> > > We choose to put this API in AdminClient because the API is more
> > >>> like an
> > >>> > > administrative operation (such as listGroups, deleteTopics) than
> a
> > >>> > consumer
> > >>> > > operation. It is not necessarily called by consumer only. For
> > >>> example, we
> > >>> > > can implement the "delete data before committed offset" approach
> by
> > >>> > running
> > >>> > > an external service which calls purgeDataBefore() API based on
> > >>> committed
> > >>> > > offset of consumer groups.
> > >>> > >
> > >>> > > I am not aware that AdminClient is not a public API. Suppose it
> is
> > >>> not
> > >>> > > public now, I assume we plan to make it public in the future as
> > part
> > >>> of
> > >>> > > KIP-4. Are we not making it public because its interface is not
> > >>> stable?
> > >>> > If
> > >>> > > so, can we just tag this new API as not stable in the code?
> > >>> > >
> > >>> >
> > >>> >
> > >>> > The AdminClient planned for KIP-4 is a new Java-based
> implementation.
> > >>> It's
> > >>> > definitely confusing that both will be (could be?) named
> AdminClient,
> > >>> but
> > >>> > we've kept the existing Scala AdminClient out of the public API and
> > >>> have
> > >>> > not required KIPs for changes to it.
> > >>> >
> > >>> > That said, I agree that if this type of API makes it into Kafka,
> > >>> having a
> > >>> > (new, Java-based) AdminClient method would definitely be a good
> idea.
> > >>> An
> > >>> > alternative path might be to have a Consumer-based implementation
> > since
> > >>> > that seems like a very intuitive, natural way to use the protocol.
> I
> > >>> think
> > >>> > optimizing for the expected use case would be a good idea.
> > >>> >
> > >>> > -Ewen
> > >>> >
> > >>> > Are you saying that the Scala AdminClient is not a public API and
> we
> > >>> discourage addition of any new feature to this class?
> > >>>
> > >>> I still prefer to add it to AdminClient (Java version in the future
> and
> > >>> Scala version in the short team) because I feel it belongs to admin
> > >>> operation instead of KafkaConsumer interface. For example, if in the
> > >>> future
> > >>> we implement the "delete data before committed offset" strategy in an
> > >>> external service, I feel it is a bit awkward if the service has to
> > >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(
> > ...)
> > >>> to
> > >>> purge data. In other words, our expected use-case doesn't necessarily
> > >>> bind
> > >>> this API with consumer.
> > >>>
> > >>> I am not strong on this issue. Let's see what other
> > committers/developers
> > >>> think about this.
> > >>>
> > >>>
> > >>> >
> > >>> > >
> > >>> > > Thanks,
> > >>> > > Dong
> > >>> > >
> > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> > >>> ewen@confluent.io
> > >>> > >
> > >>> > > wrote:
> > >>> > >
> > >>> > > > Dong,
> > >>> > > >
> > >>> > > > Looks like that's an internal link,
> > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > >>> > > > is the right one.
> > >>> > > >
> > >>> > > > I have a question about one of the rejected alternatives:
> > >>> > > >
> > >>> > > > > Using committed offset instead of an extra API to trigger
> data
> > >>> purge
> > >>> > > > operation.
> > >>> > > >
> > >>> > > > The KIP says this would be more complicated to implement. Why
> is
> > >>> that?
> > >>> > I
> > >>> > > > think brokers would have to consume the entire offsets topic,
> but
> > >>> the
> > >>> > > data
> > >>> > > > stored in memory doesn't seem to change and applying this when
> > >>> updated
> > >>> > > > offsets are seen seems basically the same. It might also be
> > >>> possible to
> > >>> > > > make it work even with multiple consumer groups if that was
> > desired
> > >>> > > > (although that'd require tracking more data in memory) as a
> > >>> > > generalization
> > >>> > > > without requiring coordination between the consumer groups.
> Given
> > >>> the
> > >>> > > > motivation, I'm assuming this was considered unnecessary since
> > this
> > >>> > > > specifically targets intermediate stream processing topics.
> > >>> > > >
> > >>> > > > Another question is why expose this via AdminClient (which
> isn't
> > >>> public
> > >>> > > API
> > >>> > > > afaik)? Why not, for example, expose it on the Consumer, which
> is
> > >>> > > > presumably where you'd want access to it since the
> functionality
> > >>> > depends
> > >>> > > on
> > >>> > > > the consumer actually having consumed the data?
> > >>> > > >
> > >>> > > > -Ewen
> > >>> > > >
> > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com>
> > >>> wrote:
> > >>> > > >
> > >>> > > > > Hi all,
> > >>> > > > >
> > >>> > > > > We created KIP-107 to propose addition of purgeDataBefore()
> API
> > >>> in
> > >>> > > > > AdminClient.
> > >>> > > > >
> > >>> > > > > Please find the KIP wiki in the link
> > https://iwww.corp.linkedin.
> > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > >>> > > > design+proposal.
> > >>> > > > > We
> > >>> > > > > would love to hear your comments and suggestions.
> > >>> > > > >
> > >>> > > > > Thanks,
> > >>> > > > > Dong
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Jun Rao <ju...@confluent.io>.
Hi, Dong,

Thanks for the proposal. Looks good overall. A couple of comments.

1. Where is the low_watermark checkpointed? Is that
in replication-offset-checkpoint? If so, do we need to bump up the version?
Could you also describe the format change?

2. For topics with "delete" retention, currently we let each replica delete
old segments independently. With low_watermark, we could just let leaders
delete old segments through the deletion policy and the followers will
simply delete old segments based on low_watermark. Not sure if this saves
much, but is a potential option that may be worth thinking about.

Jun



On Wed, Jan 4, 2017 at 8:13 AM, radai <ra...@gmail.com> wrote:

> one more example of complicated config - mirror maker.
>
> we definitely cant trust each and every topic owner to configure their
> topics not to purge before they've been mirrored.
> which would mean there's a per-topic config (set by the owner) and a
> "global" config (where mirror makers are specified) and they need to be
> "merged".
> for those topics that _are_ mirrored.
> which is a changing set of topics thats stored in an external system
> outside of kafka.
> if a topic is taken out of the mirror set the MM offset would be "frozen"
> at that point and prevent clean-up for all eternity, unless its cleaned-up
> itself.
>
> ...
>
> complexity :-)
>
> On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com> wrote:
>
> > in summary - i'm not opposed to the idea of a per-topic clean up config
> > that tracks some set of consumer groups' offsets (which would probably
> work
> > for 80% of use cases), but i definitely see a need to expose a simple API
> > for the more advanced/obscure/custom use cases (the other 20%).
> >
> > On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com>
> wrote:
> >
> >> a major motivation for this KIP is cost savings.
> >>
> >> lots of internal systems at LI use kafka as an intermediate pipe, and
> set
> >> the topic retention period to a "safe enough" amount of time to be able
> to
> >> recover from crashes/downtime and catch up to "now". this results in a
> few
> >> days' worth of retention typically.
> >>
> >> however, under normal operating conditions the consumers are mostly
> >> caught-up and so early clean-up enables a big cost savings in storage.
> >>
> >> as for my points:
> >>
> >> 1. when discussing implementation options for automatic clean-up we
> >> realized that cleaning up by keeping track of offsets stored in kafka
> >> requires some per-topic config - you need to specify which groups to
> track.
> >> this becomes a problem because:
> >>     1.1 - relatively complicated code, to be written in the broker.
> >>     1.2 - configuration needs to be maintained up to date by topic
> >> "owners" - of which we have thousands. failure to do so would decrease
> the
> >> cost benefit.
> >>     1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
> >> workflow where they will reset their offsets to an earlier value than
> the
> >> one stored. this means that a stored offset of X does not always mean
> you
> >> can clean up to X-1. think of it as video encoding -some apps have "key
> >> frames" they may seek back to which are before their current offset.
> >>     1.4 - there are multiple possible strategies - you could clean up
> >> aggressively, retain some "time distance" from latest, some "offset
> >> distance", etc. this we think would have made it very hard to agree on a
> >> single "correct" implementation that everyone would be happy with. it
> would
> >> be better to include the raw functionality in the API and leave the
> >> "brains" to an external monitoring system where people could
> custom-taylor
> >> their logic
> >>
> >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> >> console consumers and connect to a topic as a debug aid. SREs may also
> do
> >> this. there are also various other eco-system applications that may
> >> consumer from topics (unknown to topic owners as those are infra
> monitoring
> >> tools). obviously such consumer-groups' offsets should be ignored for
> >> purposes of clean-up, but coming up with a bullet-proof way to do this
> is
> >> non-trivial and again ties with implementation complexity and
> inflexibility
> >> of a "one size fits all" solution in 1.4 above.
> >>
> >> 3. forceful clean-up: we have workflows that use kafka to move gigantic
> >> blobs from offline hadoop processing flows into systems. the data being
> >> "loaded" into such an online system can be several GBs in side and take
> a
> >> long time to consume (they are sliced into many small msgs). sometimes
> the
> >> sender wants to abort and start a new blob before the current load
> process
> >> has completed - meaning the consumer's offsets are not yet caught up.
> >>
> >> 4. offsets outside of kafka: yes, you could force applications to store
> >> their offsets twice, but thats inefficient. its better to expose a raw,
> >> simple API and let such applications manage their own clean-up logic
> (this
> >> again ties into 1.4 and no "one size fits all" solution)
> >>
> >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com> wrote:
> >>
> >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> >>> ewen@confluent.io>
> >>> wrote:
> >>>
> >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com>
> wrote:
> >>> >
> >>> > > Hey Ewen,
> >>> > >
> >>> > > Thanks for the review. As Radai explained, it would be complex in
> >>> terms
> >>> > of
> >>> > > user configuration if we were to use committed offset to decide
> data
> >>> > > deletion. We need a way to specify which groups need to consume
> data
> >>> of
> >>> > > this partition. The broker will also need to consume the entire
> >>> offsets
> >>> > > topic in that approach which has some overhead. I don't think it is
> >>> that
> >>> > > hard to implement. But it will likely take more time to discuss
> that
> >>> > > approach due to the new config and the server side overhead.
> >>> > >
> >>> > > We choose to put this API in AdminClient because the API is more
> >>> like an
> >>> > > administrative operation (such as listGroups, deleteTopics) than a
> >>> > consumer
> >>> > > operation. It is not necessarily called by consumer only. For
> >>> example, we
> >>> > > can implement the "delete data before committed offset" approach by
> >>> > running
> >>> > > an external service which calls purgeDataBefore() API based on
> >>> committed
> >>> > > offset of consumer groups.
> >>> > >
> >>> > > I am not aware that AdminClient is not a public API. Suppose it is
> >>> not
> >>> > > public now, I assume we plan to make it public in the future as
> part
> >>> of
> >>> > > KIP-4. Are we not making it public because its interface is not
> >>> stable?
> >>> > If
> >>> > > so, can we just tag this new API as not stable in the code?
> >>> > >
> >>> >
> >>> >
> >>> > The AdminClient planned for KIP-4 is a new Java-based implementation.
> >>> It's
> >>> > definitely confusing that both will be (could be?) named AdminClient,
> >>> but
> >>> > we've kept the existing Scala AdminClient out of the public API and
> >>> have
> >>> > not required KIPs for changes to it.
> >>> >
> >>> > That said, I agree that if this type of API makes it into Kafka,
> >>> having a
> >>> > (new, Java-based) AdminClient method would definitely be a good idea.
> >>> An
> >>> > alternative path might be to have a Consumer-based implementation
> since
> >>> > that seems like a very intuitive, natural way to use the protocol. I
> >>> think
> >>> > optimizing for the expected use case would be a good idea.
> >>> >
> >>> > -Ewen
> >>> >
> >>> > Are you saying that the Scala AdminClient is not a public API and we
> >>> discourage addition of any new feature to this class?
> >>>
> >>> I still prefer to add it to AdminClient (Java version in the future and
> >>> Scala version in the short team) because I feel it belongs to admin
> >>> operation instead of KafkaConsumer interface. For example, if in the
> >>> future
> >>> we implement the "delete data before committed offset" strategy in an
> >>> external service, I feel it is a bit awkward if the service has to
> >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(
> ...)
> >>> to
> >>> purge data. In other words, our expected use-case doesn't necessarily
> >>> bind
> >>> this API with consumer.
> >>>
> >>> I am not strong on this issue. Let's see what other
> committers/developers
> >>> think about this.
> >>>
> >>>
> >>> >
> >>> > >
> >>> > > Thanks,
> >>> > > Dong
> >>> > >
> >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> >>> ewen@confluent.io
> >>> > >
> >>> > > wrote:
> >>> > >
> >>> > > > Dong,
> >>> > > >
> >>> > > > Looks like that's an internal link,
> >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> >>> > > > is the right one.
> >>> > > >
> >>> > > > I have a question about one of the rejected alternatives:
> >>> > > >
> >>> > > > > Using committed offset instead of an extra API to trigger data
> >>> purge
> >>> > > > operation.
> >>> > > >
> >>> > > > The KIP says this would be more complicated to implement. Why is
> >>> that?
> >>> > I
> >>> > > > think brokers would have to consume the entire offsets topic, but
> >>> the
> >>> > > data
> >>> > > > stored in memory doesn't seem to change and applying this when
> >>> updated
> >>> > > > offsets are seen seems basically the same. It might also be
> >>> possible to
> >>> > > > make it work even with multiple consumer groups if that was
> desired
> >>> > > > (although that'd require tracking more data in memory) as a
> >>> > > generalization
> >>> > > > without requiring coordination between the consumer groups. Given
> >>> the
> >>> > > > motivation, I'm assuming this was considered unnecessary since
> this
> >>> > > > specifically targets intermediate stream processing topics.
> >>> > > >
> >>> > > > Another question is why expose this via AdminClient (which isn't
> >>> public
> >>> > > API
> >>> > > > afaik)? Why not, for example, expose it on the Consumer, which is
> >>> > > > presumably where you'd want access to it since the functionality
> >>> > depends
> >>> > > on
> >>> > > > the consumer actually having consumed the data?
> >>> > > >
> >>> > > > -Ewen
> >>> > > >
> >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > Hi all,
> >>> > > > >
> >>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API
> >>> in
> >>> > > > > AdminClient.
> >>> > > > >
> >>> > > > > Please find the KIP wiki in the link
> https://iwww.corp.linkedin.
> >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> >>> > > > design+proposal.
> >>> > > > > We
> >>> > > > > would love to hear your comments and suggestions.
> >>> > > > >
> >>> > > > > Thanks,
> >>> > > > > Dong
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by radai <ra...@gmail.com>.
one more example of complicated config - mirror maker.

we definitely cant trust each and every topic owner to configure their
topics not to purge before they've been mirrored.
which would mean there's a per-topic config (set by the owner) and a
"global" config (where mirror makers are specified) and they need to be
"merged".
for those topics that _are_ mirrored.
which is a changing set of topics thats stored in an external system
outside of kafka.
if a topic is taken out of the mirror set the MM offset would be "frozen"
at that point and prevent clean-up for all eternity, unless its cleaned-up
itself.

...

complexity :-)

On Wed, Jan 4, 2017 at 8:04 AM, radai <ra...@gmail.com> wrote:

> in summary - i'm not opposed to the idea of a per-topic clean up config
> that tracks some set of consumer groups' offsets (which would probably work
> for 80% of use cases), but i definitely see a need to expose a simple API
> for the more advanced/obscure/custom use cases (the other 20%).
>
> On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com> wrote:
>
>> a major motivation for this KIP is cost savings.
>>
>> lots of internal systems at LI use kafka as an intermediate pipe, and set
>> the topic retention period to a "safe enough" amount of time to be able to
>> recover from crashes/downtime and catch up to "now". this results in a few
>> days' worth of retention typically.
>>
>> however, under normal operating conditions the consumers are mostly
>> caught-up and so early clean-up enables a big cost savings in storage.
>>
>> as for my points:
>>
>> 1. when discussing implementation options for automatic clean-up we
>> realized that cleaning up by keeping track of offsets stored in kafka
>> requires some per-topic config - you need to specify which groups to track.
>> this becomes a problem because:
>>     1.1 - relatively complicated code, to be written in the broker.
>>     1.2 - configuration needs to be maintained up to date by topic
>> "owners" - of which we have thousands. failure to do so would decrease the
>> cost benefit.
>>     1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
>> workflow where they will reset their offsets to an earlier value than the
>> one stored. this means that a stored offset of X does not always mean you
>> can clean up to X-1. think of it as video encoding -some apps have "key
>> frames" they may seek back to which are before their current offset.
>>     1.4 - there are multiple possible strategies - you could clean up
>> aggressively, retain some "time distance" from latest, some "offset
>> distance", etc. this we think would have made it very hard to agree on a
>> single "correct" implementation that everyone would be happy with. it would
>> be better to include the raw functionality in the API and leave the
>> "brains" to an external monitoring system where people could custom-taylor
>> their logic
>>
>> 2. ad-hoc consumer groups: its common practice for devs to spin up
>> console consumers and connect to a topic as a debug aid. SREs may also do
>> this. there are also various other eco-system applications that may
>> consumer from topics (unknown to topic owners as those are infra monitoring
>> tools). obviously such consumer-groups' offsets should be ignored for
>> purposes of clean-up, but coming up with a bullet-proof way to do this is
>> non-trivial and again ties with implementation complexity and inflexibility
>> of a "one size fits all" solution in 1.4 above.
>>
>> 3. forceful clean-up: we have workflows that use kafka to move gigantic
>> blobs from offline hadoop processing flows into systems. the data being
>> "loaded" into such an online system can be several GBs in side and take a
>> long time to consume (they are sliced into many small msgs). sometimes the
>> sender wants to abort and start a new blob before the current load process
>> has completed - meaning the consumer's offsets are not yet caught up.
>>
>> 4. offsets outside of kafka: yes, you could force applications to store
>> their offsets twice, but thats inefficient. its better to expose a raw,
>> simple API and let such applications manage their own clean-up logic (this
>> again ties into 1.4 and no "one size fits all" solution)
>>
>> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com> wrote:
>>
>>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
>>> ewen@confluent.io>
>>> wrote:
>>>
>>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com> wrote:
>>> >
>>> > > Hey Ewen,
>>> > >
>>> > > Thanks for the review. As Radai explained, it would be complex in
>>> terms
>>> > of
>>> > > user configuration if we were to use committed offset to decide data
>>> > > deletion. We need a way to specify which groups need to consume data
>>> of
>>> > > this partition. The broker will also need to consume the entire
>>> offsets
>>> > > topic in that approach which has some overhead. I don't think it is
>>> that
>>> > > hard to implement. But it will likely take more time to discuss that
>>> > > approach due to the new config and the server side overhead.
>>> > >
>>> > > We choose to put this API in AdminClient because the API is more
>>> like an
>>> > > administrative operation (such as listGroups, deleteTopics) than a
>>> > consumer
>>> > > operation. It is not necessarily called by consumer only. For
>>> example, we
>>> > > can implement the "delete data before committed offset" approach by
>>> > running
>>> > > an external service which calls purgeDataBefore() API based on
>>> committed
>>> > > offset of consumer groups.
>>> > >
>>> > > I am not aware that AdminClient is not a public API. Suppose it is
>>> not
>>> > > public now, I assume we plan to make it public in the future as part
>>> of
>>> > > KIP-4. Are we not making it public because its interface is not
>>> stable?
>>> > If
>>> > > so, can we just tag this new API as not stable in the code?
>>> > >
>>> >
>>> >
>>> > The AdminClient planned for KIP-4 is a new Java-based implementation.
>>> It's
>>> > definitely confusing that both will be (could be?) named AdminClient,
>>> but
>>> > we've kept the existing Scala AdminClient out of the public API and
>>> have
>>> > not required KIPs for changes to it.
>>> >
>>> > That said, I agree that if this type of API makes it into Kafka,
>>> having a
>>> > (new, Java-based) AdminClient method would definitely be a good idea.
>>> An
>>> > alternative path might be to have a Consumer-based implementation since
>>> > that seems like a very intuitive, natural way to use the protocol. I
>>> think
>>> > optimizing for the expected use case would be a good idea.
>>> >
>>> > -Ewen
>>> >
>>> > Are you saying that the Scala AdminClient is not a public API and we
>>> discourage addition of any new feature to this class?
>>>
>>> I still prefer to add it to AdminClient (Java version in the future and
>>> Scala version in the short team) because I feel it belongs to admin
>>> operation instead of KafkaConsumer interface. For example, if in the
>>> future
>>> we implement the "delete data before committed offset" strategy in an
>>> external service, I feel it is a bit awkward if the service has to
>>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...)
>>> to
>>> purge data. In other words, our expected use-case doesn't necessarily
>>> bind
>>> this API with consumer.
>>>
>>> I am not strong on this issue. Let's see what other committers/developers
>>> think about this.
>>>
>>>
>>> >
>>> > >
>>> > > Thanks,
>>> > > Dong
>>> > >
>>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
>>> ewen@confluent.io
>>> > >
>>> > > wrote:
>>> > >
>>> > > > Dong,
>>> > > >
>>> > > > Looks like that's an internal link,
>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
>>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>>> > > > is the right one.
>>> > > >
>>> > > > I have a question about one of the rejected alternatives:
>>> > > >
>>> > > > > Using committed offset instead of an extra API to trigger data
>>> purge
>>> > > > operation.
>>> > > >
>>> > > > The KIP says this would be more complicated to implement. Why is
>>> that?
>>> > I
>>> > > > think brokers would have to consume the entire offsets topic, but
>>> the
>>> > > data
>>> > > > stored in memory doesn't seem to change and applying this when
>>> updated
>>> > > > offsets are seen seems basically the same. It might also be
>>> possible to
>>> > > > make it work even with multiple consumer groups if that was desired
>>> > > > (although that'd require tracking more data in memory) as a
>>> > > generalization
>>> > > > without requiring coordination between the consumer groups. Given
>>> the
>>> > > > motivation, I'm assuming this was considered unnecessary since this
>>> > > > specifically targets intermediate stream processing topics.
>>> > > >
>>> > > > Another question is why expose this via AdminClient (which isn't
>>> public
>>> > > API
>>> > > > afaik)? Why not, for example, expose it on the Consumer, which is
>>> > > > presumably where you'd want access to it since the functionality
>>> > depends
>>> > > on
>>> > > > the consumer actually having consumed the data?
>>> > > >
>>> > > > -Ewen
>>> > > >
>>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > Hi all,
>>> > > > >
>>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API
>>> in
>>> > > > > AdminClient.
>>> > > > >
>>> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
>>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
>>> > > > design+proposal.
>>> > > > > We
>>> > > > > would love to hear your comments and suggestions.
>>> > > > >
>>> > > > > Thanks,
>>> > > > > Dong
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by radai <ra...@gmail.com>.
in summary - i'm not opposed to the idea of a per-topic clean up config
that tracks some set of consumer groups' offsets (which would probably work
for 80% of use cases), but i definitely see a need to expose a simple API
for the more advanced/obscure/custom use cases (the other 20%).

On Wed, Jan 4, 2017 at 7:54 AM, radai <ra...@gmail.com> wrote:

> a major motivation for this KIP is cost savings.
>
> lots of internal systems at LI use kafka as an intermediate pipe, and set
> the topic retention period to a "safe enough" amount of time to be able to
> recover from crashes/downtime and catch up to "now". this results in a few
> days' worth of retention typically.
>
> however, under normal operating conditions the consumers are mostly
> caught-up and so early clean-up enables a big cost savings in storage.
>
> as for my points:
>
> 1. when discussing implementation options for automatic clean-up we
> realized that cleaning up by keeping track of offsets stored in kafka
> requires some per-topic config - you need to specify which groups to track.
> this becomes a problem because:
>     1.1 - relatively complicated code, to be written in the broker.
>     1.2 - configuration needs to be maintained up to date by topic
> "owners" - of which we have thousands. failure to do so would decrease the
> cost benefit.
>     1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
> workflow where they will reset their offsets to an earlier value than the
> one stored. this means that a stored offset of X does not always mean you
> can clean up to X-1. think of it as video encoding -some apps have "key
> frames" they may seek back to which are before their current offset.
>     1.4 - there are multiple possible strategies - you could clean up
> aggressively, retain some "time distance" from latest, some "offset
> distance", etc. this we think would have made it very hard to agree on a
> single "correct" implementation that everyone would be happy with. it would
> be better to include the raw functionality in the API and leave the
> "brains" to an external monitoring system where people could custom-taylor
> their logic
>
> 2. ad-hoc consumer groups: its common practice for devs to spin up console
> consumers and connect to a topic as a debug aid. SREs may also do this.
> there are also various other eco-system applications that may consumer from
> topics (unknown to topic owners as those are infra monitoring tools).
> obviously such consumer-groups' offsets should be ignored for purposes of
> clean-up, but coming up with a bullet-proof way to do this is non-trivial
> and again ties with implementation complexity and inflexibility of a "one
> size fits all" solution in 1.4 above.
>
> 3. forceful clean-up: we have workflows that use kafka to move gigantic
> blobs from offline hadoop processing flows into systems. the data being
> "loaded" into such an online system can be several GBs in side and take a
> long time to consume (they are sliced into many small msgs). sometimes the
> sender wants to abort and start a new blob before the current load process
> has completed - meaning the consumer's offsets are not yet caught up.
>
> 4. offsets outside of kafka: yes, you could force applications to store
> their offsets twice, but thats inefficient. its better to expose a raw,
> simple API and let such applications manage their own clean-up logic (this
> again ties into 1.4 and no "one size fits all" solution)
>
> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com> wrote:
>
>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <ewen@confluent.io
>> >
>> wrote:
>>
>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com> wrote:
>> >
>> > > Hey Ewen,
>> > >
>> > > Thanks for the review. As Radai explained, it would be complex in
>> terms
>> > of
>> > > user configuration if we were to use committed offset to decide data
>> > > deletion. We need a way to specify which groups need to consume data
>> of
>> > > this partition. The broker will also need to consume the entire
>> offsets
>> > > topic in that approach which has some overhead. I don't think it is
>> that
>> > > hard to implement. But it will likely take more time to discuss that
>> > > approach due to the new config and the server side overhead.
>> > >
>> > > We choose to put this API in AdminClient because the API is more like
>> an
>> > > administrative operation (such as listGroups, deleteTopics) than a
>> > consumer
>> > > operation. It is not necessarily called by consumer only. For
>> example, we
>> > > can implement the "delete data before committed offset" approach by
>> > running
>> > > an external service which calls purgeDataBefore() API based on
>> committed
>> > > offset of consumer groups.
>> > >
>> > > I am not aware that AdminClient is not a public API. Suppose it is not
>> > > public now, I assume we plan to make it public in the future as part
>> of
>> > > KIP-4. Are we not making it public because its interface is not
>> stable?
>> > If
>> > > so, can we just tag this new API as not stable in the code?
>> > >
>> >
>> >
>> > The AdminClient planned for KIP-4 is a new Java-based implementation.
>> It's
>> > definitely confusing that both will be (could be?) named AdminClient,
>> but
>> > we've kept the existing Scala AdminClient out of the public API and have
>> > not required KIPs for changes to it.
>> >
>> > That said, I agree that if this type of API makes it into Kafka, having
>> a
>> > (new, Java-based) AdminClient method would definitely be a good idea. An
>> > alternative path might be to have a Consumer-based implementation since
>> > that seems like a very intuitive, natural way to use the protocol. I
>> think
>> > optimizing for the expected use case would be a good idea.
>> >
>> > -Ewen
>> >
>> > Are you saying that the Scala AdminClient is not a public API and we
>> discourage addition of any new feature to this class?
>>
>> I still prefer to add it to AdminClient (Java version in the future and
>> Scala version in the short team) because I feel it belongs to admin
>> operation instead of KafkaConsumer interface. For example, if in the
>> future
>> we implement the "delete data before committed offset" strategy in an
>> external service, I feel it is a bit awkward if the service has to
>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...)
>> to
>> purge data. In other words, our expected use-case doesn't necessarily bind
>> this API with consumer.
>>
>> I am not strong on this issue. Let's see what other committers/developers
>> think about this.
>>
>>
>> >
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
>> ewen@confluent.io
>> > >
>> > > wrote:
>> > >
>> > > > Dong,
>> > > >
>> > > > Looks like that's an internal link,
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>> > > > is the right one.
>> > > >
>> > > > I have a question about one of the rejected alternatives:
>> > > >
>> > > > > Using committed offset instead of an extra API to trigger data
>> purge
>> > > > operation.
>> > > >
>> > > > The KIP says this would be more complicated to implement. Why is
>> that?
>> > I
>> > > > think brokers would have to consume the entire offsets topic, but
>> the
>> > > data
>> > > > stored in memory doesn't seem to change and applying this when
>> updated
>> > > > offsets are seen seems basically the same. It might also be
>> possible to
>> > > > make it work even with multiple consumer groups if that was desired
>> > > > (although that'd require tracking more data in memory) as a
>> > > generalization
>> > > > without requiring coordination between the consumer groups. Given
>> the
>> > > > motivation, I'm assuming this was considered unnecessary since this
>> > > > specifically targets intermediate stream processing topics.
>> > > >
>> > > > Another question is why expose this via AdminClient (which isn't
>> public
>> > > API
>> > > > afaik)? Why not, for example, expose it on the Consumer, which is
>> > > > presumably where you'd want access to it since the functionality
>> > depends
>> > > on
>> > > > the consumer actually having consumed the data?
>> > > >
>> > > > -Ewen
>> > > >
>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API in
>> > > > > AdminClient.
>> > > > >
>> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
>> > > > design+proposal.
>> > > > > We
>> > > > > would love to hear your comments and suggestions.
>> > > > >
>> > > > > Thanks,
>> > > > > Dong
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by radai <ra...@gmail.com>.
a major motivation for this KIP is cost savings.

lots of internal systems at LI use kafka as an intermediate pipe, and set
the topic retention period to a "safe enough" amount of time to be able to
recover from crashes/downtime and catch up to "now". this results in a few
days' worth of retention typically.

however, under normal operating conditions the consumers are mostly
caught-up and so early clean-up enables a big cost savings in storage.

as for my points:

1. when discussing implementation options for automatic clean-up we
realized that cleaning up by keeping track of offsets stored in kafka
requires some per-topic config - you need to specify which groups to track.
this becomes a problem because:
    1.1 - relatively complicated code, to be written in the broker.
    1.2 - configuration needs to be maintained up to date by topic "owners"
- of which we have thousands. failure to do so would decrease the cost
benefit.
    1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
workflow where they will reset their offsets to an earlier value than the
one stored. this means that a stored offset of X does not always mean you
can clean up to X-1. think of it as video encoding -some apps have "key
frames" they may seek back to which are before their current offset.
    1.4 - there are multiple possible strategies - you could clean up
aggressively, retain some "time distance" from latest, some "offset
distance", etc. this we think would have made it very hard to agree on a
single "correct" implementation that everyone would be happy with. it would
be better to include the raw functionality in the API and leave the
"brains" to an external monitoring system where people could custom-taylor
their logic

2. ad-hoc consumer groups: its common practice for devs to spin up console
consumers and connect to a topic as a debug aid. SREs may also do this.
there are also various other eco-system applications that may consumer from
topics (unknown to topic owners as those are infra monitoring tools).
obviously such consumer-groups' offsets should be ignored for purposes of
clean-up, but coming up with a bullet-proof way to do this is non-trivial
and again ties with implementation complexity and inflexibility of a "one
size fits all" solution in 1.4 above.

3. forceful clean-up: we have workflows that use kafka to move gigantic
blobs from offline hadoop processing flows into systems. the data being
"loaded" into such an online system can be several GBs in side and take a
long time to consume (they are sliced into many small msgs). sometimes the
sender wants to abort and start a new blob before the current load process
has completed - meaning the consumer's offsets are not yet caught up.

4. offsets outside of kafka: yes, you could force applications to store
their offsets twice, but thats inefficient. its better to expose a raw,
simple API and let such applications manage their own clean-up logic (this
again ties into 1.4 and no "one size fits all" solution)

On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <li...@gmail.com> wrote:

> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com> wrote:
> >
> > > Hey Ewen,
> > >
> > > Thanks for the review. As Radai explained, it would be complex in terms
> > of
> > > user configuration if we were to use committed offset to decide data
> > > deletion. We need a way to specify which groups need to consume data of
> > > this partition. The broker will also need to consume the entire offsets
> > > topic in that approach which has some overhead. I don't think it is
> that
> > > hard to implement. But it will likely take more time to discuss that
> > > approach due to the new config and the server side overhead.
> > >
> > > We choose to put this API in AdminClient because the API is more like
> an
> > > administrative operation (such as listGroups, deleteTopics) than a
> > consumer
> > > operation. It is not necessarily called by consumer only. For example,
> we
> > > can implement the "delete data before committed offset" approach by
> > running
> > > an external service which calls purgeDataBefore() API based on
> committed
> > > offset of consumer groups.
> > >
> > > I am not aware that AdminClient is not a public API. Suppose it is not
> > > public now, I assume we plan to make it public in the future as part of
> > > KIP-4. Are we not making it public because its interface is not stable?
> > If
> > > so, can we just tag this new API as not stable in the code?
> > >
> >
> >
> > The AdminClient planned for KIP-4 is a new Java-based implementation.
> It's
> > definitely confusing that both will be (could be?) named AdminClient, but
> > we've kept the existing Scala AdminClient out of the public API and have
> > not required KIPs for changes to it.
> >
> > That said, I agree that if this type of API makes it into Kafka, having a
> > (new, Java-based) AdminClient method would definitely be a good idea. An
> > alternative path might be to have a Consumer-based implementation since
> > that seems like a very intuitive, natural way to use the protocol. I
> think
> > optimizing for the expected use case would be a good idea.
> >
> > -Ewen
> >
> > Are you saying that the Scala AdminClient is not a public API and we
> discourage addition of any new feature to this class?
>
> I still prefer to add it to AdminClient (Java version in the future and
> Scala version in the short team) because I feel it belongs to admin
> operation instead of KafkaConsumer interface. For example, if in the future
> we implement the "delete data before committed offset" strategy in an
> external service, I feel it is a bit awkward if the service has to
> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...) to
> purge data. In other words, our expected use-case doesn't necessarily bind
> this API with consumer.
>
> I am not strong on this issue. Let's see what other committers/developers
> think about this.
>
>
> >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> ewen@confluent.io
> > >
> > > wrote:
> > >
> > > > Dong,
> > > >
> > > > Looks like that's an internal link,
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > > is the right one.
> > > >
> > > > I have a question about one of the rejected alternatives:
> > > >
> > > > > Using committed offset instead of an extra API to trigger data
> purge
> > > > operation.
> > > >
> > > > The KIP says this would be more complicated to implement. Why is
> that?
> > I
> > > > think brokers would have to consume the entire offsets topic, but the
> > > data
> > > > stored in memory doesn't seem to change and applying this when
> updated
> > > > offsets are seen seems basically the same. It might also be possible
> to
> > > > make it work even with multiple consumer groups if that was desired
> > > > (although that'd require tracking more data in memory) as a
> > > generalization
> > > > without requiring coordination between the consumer groups. Given the
> > > > motivation, I'm assuming this was considered unnecessary since this
> > > > specifically targets intermediate stream processing topics.
> > > >
> > > > Another question is why expose this via AdminClient (which isn't
> public
> > > API
> > > > afaik)? Why not, for example, expose it on the Consumer, which is
> > > > presumably where you'd want access to it since the functionality
> > depends
> > > on
> > > > the consumer actually having consumed the data?
> > > >
> > > > -Ewen
> > > >
> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > We created KIP-107 to propose addition of purgeDataBefore() API in
> > > > > AdminClient.
> > > > >
> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > > > design+proposal.
> > > > > We
> > > > > would love to hear your comments and suggestions.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com> wrote:
>
> > Hey Ewen,
> >
> > Thanks for the review. As Radai explained, it would be complex in terms
> of
> > user configuration if we were to use committed offset to decide data
> > deletion. We need a way to specify which groups need to consume data of
> > this partition. The broker will also need to consume the entire offsets
> > topic in that approach which has some overhead. I don't think it is that
> > hard to implement. But it will likely take more time to discuss that
> > approach due to the new config and the server side overhead.
> >
> > We choose to put this API in AdminClient because the API is more like an
> > administrative operation (such as listGroups, deleteTopics) than a
> consumer
> > operation. It is not necessarily called by consumer only. For example, we
> > can implement the "delete data before committed offset" approach by
> running
> > an external service which calls purgeDataBefore() API based on committed
> > offset of consumer groups.
> >
> > I am not aware that AdminClient is not a public API. Suppose it is not
> > public now, I assume we plan to make it public in the future as part of
> > KIP-4. Are we not making it public because its interface is not stable?
> If
> > so, can we just tag this new API as not stable in the code?
> >
>
>
> The AdminClient planned for KIP-4 is a new Java-based implementation. It's
> definitely confusing that both will be (could be?) named AdminClient, but
> we've kept the existing Scala AdminClient out of the public API and have
> not required KIPs for changes to it.
>
> That said, I agree that if this type of API makes it into Kafka, having a
> (new, Java-based) AdminClient method would definitely be a good idea. An
> alternative path might be to have a Consumer-based implementation since
> that seems like a very intuitive, natural way to use the protocol. I think
> optimizing for the expected use case would be a good idea.
>
> -Ewen
>
> Are you saying that the Scala AdminClient is not a public API and we
discourage addition of any new feature to this class?

I still prefer to add it to AdminClient (Java version in the future and
Scala version in the short team) because I feel it belongs to admin
operation instead of KafkaConsumer interface. For example, if in the future
we implement the "delete data before committed offset" strategy in an
external service, I feel it is a bit awkward if the service has to
instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...) to
purge data. In other words, our expected use-case doesn't necessarily bind
this API with consumer.

I am not strong on this issue. Let's see what other committers/developers
think about this.


>
> >
> > Thanks,
> > Dong
> >
> > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> > wrote:
> >
> > > Dong,
> > >
> > > Looks like that's an internal link,
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > is the right one.
> > >
> > > I have a question about one of the rejected alternatives:
> > >
> > > > Using committed offset instead of an extra API to trigger data purge
> > > operation.
> > >
> > > The KIP says this would be more complicated to implement. Why is that?
> I
> > > think brokers would have to consume the entire offsets topic, but the
> > data
> > > stored in memory doesn't seem to change and applying this when updated
> > > offsets are seen seems basically the same. It might also be possible to
> > > make it work even with multiple consumer groups if that was desired
> > > (although that'd require tracking more data in memory) as a
> > generalization
> > > without requiring coordination between the consumer groups. Given the
> > > motivation, I'm assuming this was considered unnecessary since this
> > > specifically targets intermediate stream processing topics.
> > >
> > > Another question is why expose this via AdminClient (which isn't public
> > API
> > > afaik)? Why not, for example, expose it on the Consumer, which is
> > > presumably where you'd want access to it since the functionality
> depends
> > on
> > > the consumer actually having consumed the data?
> > >
> > > -Ewen
> > >
> > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > We created KIP-107 to propose addition of purgeDataBefore() API in
> > > > AdminClient.
> > > >
> > > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > > design+proposal.
> > > > We
> > > > would love to hear your comments and suggestions.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <li...@gmail.com> wrote:

> Hey Ewen,
>
> Thanks for the review. As Radai explained, it would be complex in terms of
> user configuration if we were to use committed offset to decide data
> deletion. We need a way to specify which groups need to consume data of
> this partition. The broker will also need to consume the entire offsets
> topic in that approach which has some overhead. I don't think it is that
> hard to implement. But it will likely take more time to discuss that
> approach due to the new config and the server side overhead.
>
> We choose to put this API in AdminClient because the API is more like an
> administrative operation (such as listGroups, deleteTopics) than a consumer
> operation. It is not necessarily called by consumer only. For example, we
> can implement the "delete data before committed offset" approach by running
> an external service which calls purgeDataBefore() API based on committed
> offset of consumer groups.
>
> I am not aware that AdminClient is not a public API. Suppose it is not
> public now, I assume we plan to make it public in the future as part of
> KIP-4. Are we not making it public because its interface is not stable? If
> so, can we just tag this new API as not stable in the code?
>


The AdminClient planned for KIP-4 is a new Java-based implementation. It's
definitely confusing that both will be (could be?) named AdminClient, but
we've kept the existing Scala AdminClient out of the public API and have
not required KIPs for changes to it.

That said, I agree that if this type of API makes it into Kafka, having a
(new, Java-based) AdminClient method would definitely be a good idea. An
alternative path might be to have a Consumer-based implementation since
that seems like a very intuitive, natural way to use the protocol. I think
optimizing for the expected use case would be a good idea.

-Ewen


>
> Thanks,
> Dong
>
> On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > Dong,
> >
> > Looks like that's an internal link,
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > is the right one.
> >
> > I have a question about one of the rejected alternatives:
> >
> > > Using committed offset instead of an extra API to trigger data purge
> > operation.
> >
> > The KIP says this would be more complicated to implement. Why is that? I
> > think brokers would have to consume the entire offsets topic, but the
> data
> > stored in memory doesn't seem to change and applying this when updated
> > offsets are seen seems basically the same. It might also be possible to
> > make it work even with multiple consumer groups if that was desired
> > (although that'd require tracking more data in memory) as a
> generalization
> > without requiring coordination between the consumer groups. Given the
> > motivation, I'm assuming this was considered unnecessary since this
> > specifically targets intermediate stream processing topics.
> >
> > Another question is why expose this via AdminClient (which isn't public
> API
> > afaik)? Why not, for example, expose it on the Consumer, which is
> > presumably where you'd want access to it since the functionality depends
> on
> > the consumer actually having consumed the data?
> >
> > -Ewen
> >
> > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > We created KIP-107 to propose addition of purgeDataBefore() API in
> > > AdminClient.
> > >
> > > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > design+proposal.
> > > We
> > > would love to hear your comments and suggestions.
> > >
> > > Thanks,
> > > Dong
> > >
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Dong Lin <li...@gmail.com>.
Hey Ewen,

Thanks for the review. As Radai explained, it would be complex in terms of
user configuration if we were to use committed offset to decide data
deletion. We need a way to specify which groups need to consume data of
this partition. The broker will also need to consume the entire offsets
topic in that approach which has some overhead. I don't think it is that
hard to implement. But it will likely take more time to discuss that
approach due to the new config and the server side overhead.

We choose to put this API in AdminClient because the API is more like an
administrative operation (such as listGroups, deleteTopics) than a consumer
operation. It is not necessarily called by consumer only. For example, we
can implement the "delete data before committed offset" approach by running
an external service which calls purgeDataBefore() API based on committed
offset of consumer groups.

I am not aware that AdminClient is not a public API. Suppose it is not
public now, I assume we plan to make it public in the future as part of
KIP-4. Are we not making it public because its interface is not stable? If
so, can we just tag this new API as not stable in the code?

Thanks,
Dong

On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Dong,
>
> Looks like that's an internal link,
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> is the right one.
>
> I have a question about one of the rejected alternatives:
>
> > Using committed offset instead of an extra API to trigger data purge
> operation.
>
> The KIP says this would be more complicated to implement. Why is that? I
> think brokers would have to consume the entire offsets topic, but the data
> stored in memory doesn't seem to change and applying this when updated
> offsets are seen seems basically the same. It might also be possible to
> make it work even with multiple consumer groups if that was desired
> (although that'd require tracking more data in memory) as a generalization
> without requiring coordination between the consumer groups. Given the
> motivation, I'm assuming this was considered unnecessary since this
> specifically targets intermediate stream processing topics.
>
> Another question is why expose this via AdminClient (which isn't public API
> afaik)? Why not, for example, expose it on the Consumer, which is
> presumably where you'd want access to it since the functionality depends on
> the consumer actually having consumed the data?
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
>
> > Hi all,
> >
> > We created KIP-107 to propose addition of purgeDataBefore() API in
> > AdminClient.
> >
> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> design+proposal.
> > We
> > would love to hear your comments and suggestions.
> >
> > Thanks,
> > Dong
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Tue, Jan 3, 2017 at 5:30 PM, radai <ra...@gmail.com> wrote:

> also 4. some apps may do their own offset bookkeeping
>

This is definitely a fair point, but if you want aggressive cleanup of data
in Kafka, you can dual commit with the Kafka commit happening second. I
don't see how this would be a problem -- inconsistency isn't a problem
since "late" commits to Kafka would only affect how quickly data is cleaned
up. If we miss the offset commit to Kafka after committing offsets to some
other system, we'd just delay deleting data for a short time. (A great
example of taking advantage of this would be the HDFS connector for Kafka
Connect, which manages its own offsets, but where users might like to be
able to more aggressively clean up data once it has landed in HDFS. I'd
love to see support for this integrated in the HDFS connector.)

I don't think the proposed approach is a bad idea, I just want to
understand the space of design options and their tradeoffs.


>
> On Tue, Jan 3, 2017 at 5:29 PM, radai <ra...@gmail.com> wrote:
>
> > the issue with tracking committed offsets is whos offsets do you track?
> >
> > 1. some topics have multiple groups
>

Couldn't this go into the topic-level config? This is why I mentioned 1 vs
multiple groups in my earlier reply. 1 group keeps things simple wrt how
deciding if deleting log segments happens and would easily fit into a
topic-level config (I think it doesn't require additional state in memory
despite requiring consuming all __consumer_offsets partitions); multiple
groups complicates how the config is specified and how the state to
determine if we can delete a log segment is tracked. That said, I don't see
a fundamental reason we couldn't support multiple consumer groups per topic.


> > 2. some "groups" are really one-offs like developers spinning up console
> > consumer "just to see if there's data"
>

This seems very counter to the motivating use case in the KIP for
intermediate stream processing topics? The stated use case is for stream
processing apps where, presumably, there would be a single, fixed,
deterministically named consumer for the data?


> > 3. there are use cases where you want to deliberately "wipe" data EVEN IF
> > its still being consumed
>

What are these use cases? Can we get them enumerated in the KIP so we
understand the use cases and conditions where this would happen? What are
the cases that wouldn't be covered by existing retention policies? The only
new type of policy proposed so far is based on whether data has been
consumed or not; is there something new besides a) time-based b) size-based
or c) consumed-based?


> >
> > #1 is a configuration mess, since there are multiple possible strategies.
> > #2 is problematic without a definition of "liveliness" or special
> handling
> > for console consumer? and #3 is flat out impossible with committed-offset
> > tracking
> >
> > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ewen@confluent.io
> >
> > wrote:
> >
> >> Dong,
> >>
> >> Looks like that's an internal link,
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> >> 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> >> is the right one.
> >>
> >> I have a question about one of the rejected alternatives:
> >>
> >> > Using committed offset instead of an extra API to trigger data purge
> >> operation.
> >>
> >> The KIP says this would be more complicated to implement. Why is that? I
> >> think brokers would have to consume the entire offsets topic, but the
> data
> >> stored in memory doesn't seem to change and applying this when updated
> >> offsets are seen seems basically the same. It might also be possible to
> >> make it work even with multiple consumer groups if that was desired
> >> (although that'd require tracking more data in memory) as a
> generalization
> >> without requiring coordination between the consumer groups. Given the
> >> motivation, I'm assuming this was considered unnecessary since this
> >> specifically targets intermediate stream processing topics.
> >>
> >> Another question is why expose this via AdminClient (which isn't public
> >> API
> >> afaik)? Why not, for example, expose it on the Consumer, which is
> >> presumably where you'd want access to it since the functionality depends
> >> on
> >> the consumer actually having consumed the data?
> >>
> >> -Ewen
> >>
> >> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
> >>
> >> > Hi all,
> >> >
> >> > We created KIP-107 to propose addition of purgeDataBefore() API in
> >> > AdminClient.
> >> >
> >> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> >> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> >> design+proposal.
> >> > We
> >> > would love to hear your comments and suggestions.
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >>
> >
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by radai <ra...@gmail.com>.
also 4. some apps may do their own offset bookkeeping

On Tue, Jan 3, 2017 at 5:29 PM, radai <ra...@gmail.com> wrote:

> the issue with tracking committed offsets is whos offsets do you track?
>
> 1. some topics have multiple groups
> 2. some "groups" are really one-offs like developers spinning up console
> consumer "just to see if there's data"
> 3. there are use cases where you want to deliberately "wipe" data EVEN IF
> its still being consumed
>
> #1 is a configuration mess, since there are multiple possible strategies.
> #2 is problematic without a definition of "liveliness" or special handling
> for console consumer? and #3 is flat out impossible with committed-offset
> tracking
>
> On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
>> Dong,
>>
>> Looks like that's an internal link,
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
>> 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>> is the right one.
>>
>> I have a question about one of the rejected alternatives:
>>
>> > Using committed offset instead of an extra API to trigger data purge
>> operation.
>>
>> The KIP says this would be more complicated to implement. Why is that? I
>> think brokers would have to consume the entire offsets topic, but the data
>> stored in memory doesn't seem to change and applying this when updated
>> offsets are seen seems basically the same. It might also be possible to
>> make it work even with multiple consumer groups if that was desired
>> (although that'd require tracking more data in memory) as a generalization
>> without requiring coordination between the consumer groups. Given the
>> motivation, I'm assuming this was considered unnecessary since this
>> specifically targets intermediate stream processing topics.
>>
>> Another question is why expose this via AdminClient (which isn't public
>> API
>> afaik)? Why not, for example, expose it on the Consumer, which is
>> presumably where you'd want access to it since the functionality depends
>> on
>> the consumer actually having consumed the data?
>>
>> -Ewen
>>
>> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
>>
>> > Hi all,
>> >
>> > We created KIP-107 to propose addition of purgeDataBefore() API in
>> > AdminClient.
>> >
>> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
>> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
>> design+proposal.
>> > We
>> > would love to hear your comments and suggestions.
>> >
>> > Thanks,
>> > Dong
>> >
>>
>
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by radai <ra...@gmail.com>.
the issue with tracking committed offsets is whos offsets do you track?

1. some topics have multiple groups
2. some "groups" are really one-offs like developers spinning up console
consumer "just to see if there's data"
3. there are use cases where you want to deliberately "wipe" data EVEN IF
its still being consumed

#1 is a configuration mess, since there are multiple possible strategies.
#2 is problematic without a definition of "liveliness" or special handling
for console consumer? and #3 is flat out impossible with committed-offset
tracking

On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Dong,
>
> Looks like that's an internal link,
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> is the right one.
>
> I have a question about one of the rejected alternatives:
>
> > Using committed offset instead of an extra API to trigger data purge
> operation.
>
> The KIP says this would be more complicated to implement. Why is that? I
> think brokers would have to consume the entire offsets topic, but the data
> stored in memory doesn't seem to change and applying this when updated
> offsets are seen seems basically the same. It might also be possible to
> make it work even with multiple consumer groups if that was desired
> (although that'd require tracking more data in memory) as a generalization
> without requiring coordination between the consumer groups. Given the
> motivation, I'm assuming this was considered unnecessary since this
> specifically targets intermediate stream processing topics.
>
> Another question is why expose this via AdminClient (which isn't public API
> afaik)? Why not, for example, expose it on the Consumer, which is
> presumably where you'd want access to it since the functionality depends on
> the consumer actually having consumed the data?
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:
>
> > Hi all,
> >
> > We created KIP-107 to propose addition of purgeDataBefore() API in
> > AdminClient.
> >
> > Please find the KIP wiki in the link https://iwww.corp.linkedin.
> > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+
> API+design+proposal.
> > We
> > would love to hear your comments and suggestions.
> >
> > Thanks,
> > Dong
> >
>

Re: [DISCUSS] KIP-107: Add purgeDataBefore() API in AdminClient

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Dong,

Looks like that's an internal link,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
is the right one.

I have a question about one of the rejected alternatives:

> Using committed offset instead of an extra API to trigger data purge
operation.

The KIP says this would be more complicated to implement. Why is that? I
think brokers would have to consume the entire offsets topic, but the data
stored in memory doesn't seem to change and applying this when updated
offsets are seen seems basically the same. It might also be possible to
make it work even with multiple consumer groups if that was desired
(although that'd require tracking more data in memory) as a generalization
without requiring coordination between the consumer groups. Given the
motivation, I'm assuming this was considered unnecessary since this
specifically targets intermediate stream processing topics.

Another question is why expose this via AdminClient (which isn't public API
afaik)? Why not, for example, expose it on the Consumer, which is
presumably where you'd want access to it since the functionality depends on
the consumer actually having consumed the data?

-Ewen

On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <li...@gmail.com> wrote:

> Hi all,
>
> We created KIP-107 to propose addition of purgeDataBefore() API in
> AdminClient.
>
> Please find the KIP wiki in the link https://iwww.corp.linkedin.
> com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+design+proposal.
> We
> would love to hear your comments and suggestions.
>
> Thanks,
> Dong
>