You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Bill Warshaw <wd...@gmail.com> on 2016/10/05 20:55:53 UTC

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Bumping for visibility.  KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-47+-+Add+timestamp-based+log+deletion+policy

On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com> wrote:

> Hello Guozhang,
>
> KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new deletion
> policy (minimum timestamp), while KIP-71 is allowing deletion and
> compaction to coexist.
>
> They both will touch LogManager, but the change for KIP-47 is very
> isolated.
>
> On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> Hi Bill,
>
> I would like to reason if there is any correlation between this KIP and
> KIP-71
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
>
> I feel they are orthogonal but would like to double check with you.
>
>
> Guozhang
>
>
> On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> wrote:
>
> > I'd like to re-awaken this voting thread now that KIP-33 has merged.
> This
> > KIP is now completely unblocked.  I have a working branch off of trunk
> with
> > my proposed fix, including testing.
> >
> > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Jay, Bill:
> > >
> > > I'm thinking of one general use case of using timestamp rather than
> > offset
> > > for log deletion, which is that for expiration handling in data
> > > replication, when the source data store decides to expire some data
> > records
> > > based on their timestamps, today we need to configure the corresponding
> > > Kafka changelog topic for compaction, and actively send a tombstone for
> > > each expired record. Since expiration usually happens with a bunch of
> > > records, this could generate large tombstone traffic. For example I
> think
> > > LI's data replication for Espresso is seeing similar issues and they
> are
> > > just not sending tombstone at all.
> > >
> > > With timestamp based log deletion policy, this can be easily handled by
> > > simply setting the current expiration timestamp; but ideally one would
> > > prefer to configure this topic to be both log compaction enabled as
> well
> > as
> > > log deletion enabled. From that point of view, I feel that current KIP
> > > still has value to be accepted.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <wd...@gmail.com>
> > wrote:
> > >
> > > > Yes, I'd agree that offset is a more precise configuration than
> > > timestamp.
> > > > If there was a way to set a partition-level configuration, I would
> > rather
> > > > use log.retention.min.offset than timestamp.  If you have an approach
> > in
> > > > mind I'd be open to investigating it.
> > > >
> > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io> wrote:
> > > >
> > > > > Gotcha, good point. But barring that limitation, you agree that
> that
> > > > makes
> > > > > more sense?
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <wd...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > The problem with offset as a config option is that offsets are
> > > > > > partition-specific, so we'd need a per-partition config.  This
> > would
> > > > work
> > > > > > for our particular use case, where we have single-partition
> topics,
> > > but
> > > > > for
> > > > > > multiple-partition topics it would delete from all partitions
> based
> > > on
> > > > a
> > > > > > global topic-level offset.
> > > > > >
> > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <ja...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > I think you are saying you considered a kind of trim() api that
> > > would
> > > > > > > synchronously chop off the tail of the log starting from a
> given
> > > > > offset.
> > > > > > > That would be one option, but what I was saying was slightly
> > > > different:
> > > > > > in
> > > > > > > the proposal you have where there is a config that controls
> > > retention
> > > > > > that
> > > > > > > the user would update, wouldn't it make more sense for this
> > config
> > > to
> > > > > be
> > > > > > > based on offset rather than timestamp?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > wdwarshaw@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > 1.  Initially I looked at using the actual offset, by adding
> a
> > > call
> > > > > to
> > > > > > > > AdminUtils to just delete anything in a given topic/partition
> > to
> > > a
> > > > > > given
> > > > > > > > offset.  I ran into a lot of trouble here trying to work out
> > how
> > > > the
> > > > > > > system
> > > > > > > > would recognize that every broker had successfully deleted
> that
> > > > range
> > > > > > > from
> > > > > > > > the partition before returning to the client.  If we were ok
> > > > treating
> > > > > > > this
> > > > > > > > as a completely asynchronous operation I would be open to
> > > > revisiting
> > > > > > this
> > > > > > > > approach.
> > > > > > > >
> > > > > > > > 2.  For our use case, we would be updating the config every
> few
> > > > hours
> > > > > > > for a
> > > > > > > > given topic, and there would not a be a sizable amount of
> > > > > consumers.  I
> > > > > > > > imagine that this would not scale well if someone was
> adjusting
> > > > this
> > > > > > > config
> > > > > > > > very frequently on a large system, but I don't know if there
> > are
> > > > any
> > > > > > use
> > > > > > > > cases where that would occur.  I imagine most use cases would
> > > > involve
> > > > > > > > truncating the log after taking a snapshot or doing some
> other
> > > > > > expensive
> > > > > > > > operation that didn't occur very frequently.
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <ja...@confluent.io>
> > > > wrote:
> > > > > > > >
> > > > > > > > > Two comments:
> > > > > > > > >
> > > > > > > > >    1. Is there a reason to use physical time rather than
> > > offset?
> > > > > The
> > > > > > > idea
> > > > > > > > >    is for the consumer to say when it has consumed
> something
> > so
> > > > it
> > > > > > can
> > > > > > > be
> > > > > > > > >    deleted, right? It seems like offset would be a much
> more
> > > > > precise
> > > > > > > way
> > > > > > > > > to do
> > > > > > > > >    this--i.e. the consumer says "I have checkpointed state
> up
> > > to
> > > > > > > offset X
> > > > > > > > > you
> > > > > > > > >    can get rid of anything prior to that". Doing this by
> > > > timestamp
> > > > > > > seems
> > > > > > > > > like
> > > > > > > > >    it is just more error prone...
> > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > requires
> > > > > > several
> > > > > > > ZK
> > > > > > > > >    writes per config change, so I guess that depends on how
> > > > > > frequently
> > > > > > > > the
> > > > > > > > >    consumers would update the value and how many consumers
> > > there
> > > > > > > > are...any
> > > > > > > > >    thoughts on this?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > wdwarshaw@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now that
> KIP-33
> > > has
> > > > > > been
> > > > > > > > > > accepted and is in-progress.  I've updated the KIP (
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > ).
> > > > > > > > > > I have a commit with the functionality for KIP-47 ready
> to
> > go
> > > > > once
> > > > > > > > KIP-33
> > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > >
> > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > gwen@confluent.io>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > >
> > > > > > > > > > > Do you mind updating the KIP with  time formats we plan
> > on
> > > > > > > supporting
> > > > > > > > > > > in the configuration?
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > wdwarshaw@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > Hello,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>
>

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Jun Rao <ju...@confluent.io>.
Bill,

That's a good question. I am thinking of the following approach for
implementing trim(): (1) client issues metadata request to the broker to
determine the leader of topic/partitions and groups topic/partitions by the
leader broker; (2) client sends a TrimRequest to each broker with
partitions whose leader is on the broker; (3) leader trim the corresponding
segments locally; (4) extend the fetch request/response protocol such that
the leader propagates a firstOffset (first available offset) for each
partition in the follower fetch response; (5) follower trims the local
segment according to firstOffset in the fetch response. We have been
talking about adding an admin client as part of KIP-4. We could add such a
trim() method in the admin client.

Thanks,

Jun


On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw <wd...@gmail.com> wrote:

> Hi Jun,
>
> Those are valid concerns.  For our particular use case, application events
> triggering the timestamp update will never occur more than once an hour,
> and we maintain a sliding window so that we don't delete messages too close
> to what our consumers may be reading.
> For more general use cases, developers will need to be aware of these
> issues, and would need to write their application code with that in mind.
>
>
> To your second point: I initially wanted to just have a trim() admin api.
> I started implementing it, but ran into difficulties with synchronously
> acknowledging to the calling code that all brokers had truncated the given
> partitions.  It seemed like we would have to do something similar to how
> topic deletion is implemented, where the initial broker uses Zookeeper to
> coordinate the deletion on the other brokers.  If you have a simpler idea
> in mind, I'd be happy to update this KIP to provide a trim() api instead.
>
> On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Bill,
> >
> > Thanks for the proposal. Sorry for the late reply.
> >
> > The motivation of the proposal makes sense: don't delete the messages
> until
> > the application tells you so.
> >
> > I am wondering if the current proposal is the best way to address the
> need
> > though. There are couple of issues that I saw with the proposal. (1)
> > Messages in the log may not always be stored in increasing timestamp
> order.
> > Suppose that the application sets log.retention.min.timestamp to T and
> > after that messages with timestamp older than T ((either due to delay or
> > reprocessing) are published to that topic. Those newly published messages
> > are likely going to be deleted immediately before the application gets a
> > chance to read them, which is probably not what the application wants.
> (2)
> > The configuration for the topic has to be changed continuously to
> implement
> > the use case. Intuitively, one probably shouldn't be changing a
> > configuration all the time.
> >
> > Another way to achieve the goal is what Jay mentioned earlier. We could
> add
> > a trim() api like the following that will trim the log up to the
> specified
> > offsets. This addresses both of the above issues that I mentioned. Will
> > that work for you?
> >
> > void trim(Map<TopicPartition, Long> offsetsToTruncate)
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com>
> wrote:
> >
> > > Bumping for visibility.  KIP is here:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 47+-+Add+timestamp-based+log+deletion+policy
> > >
> > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> > deletion
> > > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > > compaction to coexist.
> > > >
> > > > They both will touch LogManager, but the change for KIP-47 is very
> > > > isolated.
> > > >
> > > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > Hi Bill,
> > > >
> > > > I would like to reason if there is any correlation between this KIP
> and
> > > > KIP-71
> > > >
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > > >
> > > > I feel they are orthogonal but would like to double check with you.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> > > > wrote:
> > > >
> > > > > I'd like to re-awaken this voting thread now that KIP-33 has
> merged.
> > > > This
> > > > > KIP is now completely unblocked.  I have a working branch off of
> > trunk
> > > > with
> > > > > my proposed fix, including testing.
> > > > >
> > > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Jay, Bill:
> > > > > >
> > > > > > I'm thinking of one general use case of using timestamp rather
> than
> > > > > offset
> > > > > > for log deletion, which is that for expiration handling in data
> > > > > > replication, when the source data store decides to expire some
> data
> > > > > records
> > > > > > based on their timestamps, today we need to configure the
> > > corresponding
> > > > > > Kafka changelog topic for compaction, and actively send a
> tombstone
> > > for
> > > > > > each expired record. Since expiration usually happens with a
> bunch
> > of
> > > > > > records, this could generate large tombstone traffic. For
> example I
> > > > think
> > > > > > LI's data replication for Espresso is seeing similar issues and
> > they
> > > > are
> > > > > > just not sending tombstone at all.
> > > > > >
> > > > > > With timestamp based log deletion policy, this can be easily
> > handled
> > > by
> > > > > > simply setting the current expiration timestamp; but ideally one
> > > would
> > > > > > prefer to configure this topic to be both log compaction enabled
> as
> > > > well
> > > > > as
> > > > > > log deletion enabled. From that point of view, I feel that
> current
> > > KIP
> > > > > > still has value to be accepted.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <
> wdwarshaw@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Yes, I'd agree that offset is a more precise configuration than
> > > > > > timestamp.
> > > > > > > If there was a way to set a partition-level configuration, I
> > would
> > > > > rather
> > > > > > > use log.retention.min.offset than timestamp.  If you have an
> > > approach
> > > > > in
> > > > > > > mind I'd be open to investigating it.
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > > Gotcha, good point. But barring that limitation, you agree
> that
> > > > that
> > > > > > > makes
> > > > > > > > more sense?
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> > > wdwarshaw@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > The problem with offset as a config option is that offsets
> > are
> > > > > > > > > partition-specific, so we'd need a per-partition config.
> > This
> > > > > would
> > > > > > > work
> > > > > > > > > for our particular use case, where we have single-partition
> > > > topics,
> > > > > > but
> > > > > > > > for
> > > > > > > > > multiple-partition topics it would delete from all
> partitions
> > > > based
> > > > > > on
> > > > > > > a
> > > > > > > > > global topic-level offset.
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <
> jay@confluent.io>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I think you are saying you considered a kind of trim()
> api
> > > that
> > > > > > would
> > > > > > > > > > synchronously chop off the tail of the log starting from
> a
> > > > given
> > > > > > > > offset.
> > > > > > > > > > That would be one option, but what I was saying was
> > slightly
> > > > > > > different:
> > > > > > > > > in
> > > > > > > > > > the proposal you have where there is a config that
> controls
> > > > > > retention
> > > > > > > > > that
> > > > > > > > > > the user would update, wouldn't it make more sense for
> this
> > > > > config
> > > > > > to
> > > > > > > > be
> > > > > > > > > > based on offset rather than timestamp?
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > > > wdwarshaw@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > 1.  Initially I looked at using the actual offset, by
> > > adding
> > > > a
> > > > > > call
> > > > > > > > to
> > > > > > > > > > > AdminUtils to just delete anything in a given
> > > topic/partition
> > > > > to
> > > > > > a
> > > > > > > > > given
> > > > > > > > > > > offset.  I ran into a lot of trouble here trying to
> work
> > > out
> > > > > how
> > > > > > > the
> > > > > > > > > > system
> > > > > > > > > > > would recognize that every broker had successfully
> > deleted
> > > > that
> > > > > > > range
> > > > > > > > > > from
> > > > > > > > > > > the partition before returning to the client.  If we
> were
> > > ok
> > > > > > > treating
> > > > > > > > > > this
> > > > > > > > > > > as a completely asynchronous operation I would be open
> to
> > > > > > > revisiting
> > > > > > > > > this
> > > > > > > > > > > approach.
> > > > > > > > > > >
> > > > > > > > > > > 2.  For our use case, we would be updating the config
> > every
> > > > few
> > > > > > > hours
> > > > > > > > > > for a
> > > > > > > > > > > given topic, and there would not a be a sizable amount
> of
> > > > > > > > consumers.  I
> > > > > > > > > > > imagine that this would not scale well if someone was
> > > > adjusting
> > > > > > > this
> > > > > > > > > > config
> > > > > > > > > > > very frequently on a large system, but I don't know if
> > > there
> > > > > are
> > > > > > > any
> > > > > > > > > use
> > > > > > > > > > > cases where that would occur.  I imagine most use cases
> > > would
> > > > > > > involve
> > > > > > > > > > > truncating the log after taking a snapshot or doing
> some
> > > > other
> > > > > > > > > expensive
> > > > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> > > jay@confluent.io>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Two comments:
> > > > > > > > > > > >
> > > > > > > > > > > >    1. Is there a reason to use physical time rather
> > than
> > > > > > offset?
> > > > > > > > The
> > > > > > > > > > idea
> > > > > > > > > > > >    is for the consumer to say when it has consumed
> > > > something
> > > > > so
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > >    deleted, right? It seems like offset would be a
> much
> > > > more
> > > > > > > > precise
> > > > > > > > > > way
> > > > > > > > > > > > to do
> > > > > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> > > state
> > > > up
> > > > > > to
> > > > > > > > > > offset X
> > > > > > > > > > > > you
> > > > > > > > > > > >    can get rid of anything prior to that". Doing this
> > by
> > > > > > > timestamp
> > > > > > > > > > seems
> > > > > > > > > > > > like
> > > > > > > > > > > >    it is just more error prone...
> > > > > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > > > > requires
> > > > > > > > > several
> > > > > > > > > > ZK
> > > > > > > > > > > >    writes per config change, so I guess that depends
> on
> > > how
> > > > > > > > > frequently
> > > > > > > > > > > the
> > > > > > > > > > > >    consumers would update the value and how many
> > > consumers
> > > > > > there
> > > > > > > > > > > are...any
> > > > > > > > > > > >    thoughts on this?
> > > > > > > > > > > >
> > > > > > > > > > > > -Jay
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > > > wdwarshaw@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now
> that
> > > > KIP-33
> > > > > > has
> > > > > > > > > been
> > > > > > > > > > > > > accepted and is in-progress.  I've updated the KIP
> (
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > ).
> > > > > > > > > > > > > I have a commit with the functionality for KIP-47
> > ready
> > > > to
> > > > > go
> > > > > > > > once
> > > > > > > > > > > KIP-33
> > > > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > > > gwen@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Do you mind updating the KIP with  time formats
> we
> > > plan
> > > > > on
> > > > > > > > > > supporting
> > > > > > > > > > > > > > in the configuration?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > > > wdwarshaw@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > >
> >
>

[VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Joel Koshy <jj...@gmail.com>.
>
> - It seems that the consumer will need to write log.retention.min.timestamp
> periodically to zookeeper as dynamic configuration of the topic, so that
> broker can pick up log.retention.min.timestamp. However, this introduces
> dependency of consumer on zookeeper which is undesirable. Note that we have
>

We will be eliminating the need for manipulating topic configs directly in
ZK with the admin APIs in KIP-4
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ConfigAdminSchema>


> log.retention.min.timestamp. However, it is not clear how client
> application can set the log.retention.min.timestamp to address the
> use-case. For example, if there are more than one consumer in the consumer
> group, which consumer(s) write log.retention.min.timestamp to zookeeper?

How does consumer determine the value of log.retention.min.timestamp?


I don't quite see the issue here: this is really up to the application to
handle and applies to the trim() approach as well.

>
> this KIP. And a malicious or misconfigured client can easily delete all
> messages of any topic. How do we address this problem so that operator
> won't have to worry about this?
>

The admin APIs are Kafka RPCs that can all be authorized.

BTW, I like Jun's solution of using offsets and IMO it works. Jun's
> solution would also address some problems above. Some ideas discussed in
> the thread of KIP-68 may help address some problems above.
>

I agree - the trim() approach evades the timestamp issue by dealing with
offsets directly (unless the user explicitly opts to look up offsets by
timestamp). Once we are convinced that this simpler approach can satisfy
the motivation for both this KIP as well as KIP-68 we should probably just
consolidate these as use-cases of a new KIP for the trim() API.


> On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw <wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>> wrote:
>
> > Hi Jun,
> >
> > Those are valid concerns.  For our particular use case, application
> events
> > triggering the timestamp update will never occur more than once an hour,
> > and we maintain a sliding window so that we don't delete messages too
> close
> > to what our consumers may be reading.
> > For more general use cases, developers will need to be aware of these
> > issues, and would need to write their application code with that in mind.
> >
> >
> > To your second point: I initially wanted to just have a trim() admin api.
> > I started implementing it, but ran into difficulties with synchronously
> > acknowledging to the calling code that all brokers had truncated the
> given
> > partitions.  It seemed like we would have to do something similar to how
> > topic deletion is implemented, where the initial broker uses Zookeeper to
> > coordinate the deletion on the other brokers.  If you have a simpler idea
> > in mind, I'd be happy to update this KIP to provide a trim() api instead.
> >
> > On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <jun@confluent.io
> <javascript:_e(%7B%7D,'cvml','jun@confluent.io');>> wrote:
> >
> > > Hi, Bill,
> > >
> > > Thanks for the proposal. Sorry for the late reply.
> > >
> > > The motivation of the proposal makes sense: don't delete the messages
> > until
> > > the application tells you so.
> > >
> > > I am wondering if the current proposal is the best way to address the
> > need
> > > though. There are couple of issues that I saw with the proposal. (1)
> > > Messages in the log may not always be stored in increasing timestamp
> > order.
> > > Suppose that the application sets log.retention.min.timestamp to T and
> > > after that messages with timestamp older than T ((either due to delay
> or
> > > reprocessing) are published to that topic. Those newly published
> messages
> > > are likely going to be deleted immediately before the application gets
> a
> > > chance to read them, which is probably not what the application wants.
> > (2)
> > > The configuration for the topic has to be changed continuously to
> > implement
> > > the use case. Intuitively, one probably shouldn't be changing a
> > > configuration all the time.
> > >
> > > Another way to achieve the goal is what Jay mentioned earlier. We could
> > add
> > > a trim() api like the following that will trim the log up to the
> > specified
> > > offsets. This addresses both of the above issues that I mentioned. Will
> > > that work for you?
> > >
> > > void trim(Map<TopicPartition, Long> offsetsToTruncate)
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>>
> > wrote:
> > >
> > > > Bumping for visibility.  KIP is here:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > >
> > > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>>
> > > wrote:
> > > >
> > > > > Hello Guozhang,
> > > > >
> > > > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> > > deletion
> > > > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > > > compaction to coexist.
> > > > >
> > > > > They both will touch LogManager, but the change for KIP-47 is very
> > > > > isolated.
> > > > >
> > > > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wangguoz@gmail.com
> <javascript:_e(%7B%7D,'cvml','wangguoz@gmail.com');>>
> > > > wrote:
> > > > >
> > > > > Hi Bill,
> > > > >
> > > > > I would like to reason if there is any correlation between this KIP
> > and
> > > > > KIP-71
> > > > >
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > > > >
> > > > > I feel they are orthogonal but would like to double check with you.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <
> wdwarshaw@gmail.com <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>>
> > > > > wrote:
> > > > >
> > > > > > I'd like to re-awaken this voting thread now that KIP-33 has
> > merged.
> > > > > This
> > > > > > KIP is now completely unblocked.  I have a working branch off of
> > > trunk
> > > > > with
> > > > > > my proposed fix, including testing.
> > > > > >
> > > > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wangguoz@gmail.com
> <javascript:_e(%7B%7D,'cvml','wangguoz@gmail.com');>>
> > > > wrote:
> > > > > >
> > > > > > > Jay, Bill:
> > > > > > >
> > > > > > > I'm thinking of one general use case of using timestamp rather
> > than
> > > > > > offset
> > > > > > > for log deletion, which is that for expiration handling in data
> > > > > > > replication, when the source data store decides to expire some
> > data
> > > > > > records
> > > > > > > based on their timestamps, today we need to configure the
> > > > corresponding
> > > > > > > Kafka changelog topic for compaction, and actively send a
> > tombstone
> > > > for
> > > > > > > each expired record. Since expiration usually happens with a
> > bunch
> > > of
> > > > > > > records, this could generate large tombstone traffic. For
> > example I
> > > > > think
> > > > > > > LI's data replication for Espresso is seeing similar issues and
> > > they
> > > > > are
> > > > > > > just not sending tombstone at all.
> > > > > > >
> > > > > > > With timestamp based log deletion policy, this can be easily
> > > handled
> > > > by
> > > > > > > simply setting the current expiration timestamp; but ideally
> one
> > > > would
> > > > > > > prefer to configure this topic to be both log compaction
> enabled
> > as
> > > > > well
> > > > > > as
> > > > > > > log deletion enabled. From that point of view, I feel that
> > current
> > > > KIP
> > > > > > > still has value to be accepted.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <
> > wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Yes, I'd agree that offset is a more precise configuration
> than
> > > > > > > timestamp.
> > > > > > > > If there was a way to set a partition-level configuration, I
> > > would
> > > > > > rather
> > > > > > > > use log.retention.min.offset than timestamp.  If you have an
> > > > approach
> > > > > > in
> > > > > > > > mind I'd be open to investigating it.
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <jay@confluent.io
> <javascript:_e(%7B%7D,'cvml','jay@confluent.io');>>
> > > > wrote:
> > > > > > > >
> > > > > > > > > Gotcha, good point. But barring that limitation, you agree
> > that
> > > > > that
> > > > > > > > makes
> > > > > > > > > more sense?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> > > > wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > The problem with offset as a config option is that
> offsets
> > > are
> > > > > > > > > > partition-specific, so we'd need a per-partition config.
> > > This
> > > > > > would
> > > > > > > > work
> > > > > > > > > > for our particular use case, where we have
> single-partition
> > > > > topics,
> > > > > > > but
> > > > > > > > > for
> > > > > > > > > > multiple-partition topics it would delete from all
> > partitions
> > > > > based
> > > > > > > on
> > > > > > > > a
> > > > > > > > > > global topic-level offset.
> > > > > > > > > >
> > > > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <
> > jay@confluent.io <javascript:_e(%7B%7D,'cvml','jay@confluent.io');>>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I think you are saying you considered a kind of trim()
> > api
> > > > that
> > > > > > > would
> > > > > > > > > > > synchronously chop off the tail of the log starting
> from
> > a
> > > > > given
> > > > > > > > > offset.
> > > > > > > > > > > That would be one option, but what I was saying was
> > > slightly
> > > > > > > > different:
> > > > > > > > > > in
> > > > > > > > > > > the proposal you have where there is a config that
> > controls
> > > > > > > retention
> > > > > > > > > > that
> > > > > > > > > > > the user would update, wouldn't it make more sense for
> > this
> > > > > > config
> > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > based on offset rather than timestamp?
> > > > > > > > > > >
> > > > > > > > > > > -Jay
> > > > > > > > > > >
> > > > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > > > > wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > 1.  Initially I looked at using the actual offset, by
> > > > adding
> > > > > a
> > > > > > > call
> > > > > > > > > to
> > > > > > > > > > > > AdminUtils to just delete anything in a given
> > > > topic/partition
> > > > > > to
> > > > > > > a
> > > > > > > > > > given
> > > > > > > > > > > > offset.  I ran into a lot of trouble here trying to
> > work
> > > > out
> > > > > > how
> > > > > > > > the
> > > > > > > > > > > system
> > > > > > > > > > > > would recognize that every broker had successfully
> > > deleted
> > > > > that
> > > > > > > > range
> > > > > > > > > > > from
> > > > > > > > > > > > the partition before returning to the client.  If we
> > were
> > > > ok
> > > > > > > > treating
> > > > > > > > > > > this
> > > > > > > > > > > > as a completely asynchronous operation I would be
> open
> > to
> > > > > > > > revisiting
> > > > > > > > > > this
> > > > > > > > > > > > approach.
> > > > > > > > > > > >
> > > > > > > > > > > > 2.  For our use case, we would be updating the config
> > > every
> > > > > few
> > > > > > > > hours
> > > > > > > > > > > for a
> > > > > > > > > > > > given topic, and there would not a be a sizable
> amount
> > of
> > > > > > > > > consumers.  I
> > > > > > > > > > > > imagine that this would not scale well if someone was
> > > > > adjusting
> > > > > > > > this
> > > > > > > > > > > config
> > > > > > > > > > > > very frequently on a large system, but I don't know
> if
> > > > there
> > > > > > are
> > > > > > > > any
> > > > > > > > > > use
> > > > > > > > > > > > cases where that would occur.  I imagine most use
> cases
> > > > would
> > > > > > > > involve
> > > > > > > > > > > > truncating the log after taking a snapshot or doing
> > some
> > > > > other
> > > > > > > > > > expensive
> > > > > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> > > > jay@confluent.io <javascript:_e(%7B%7D,'cvml','jay@confluent.io');>>
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Two comments:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. Is there a reason to use physical time rather
> > > than
> > > > > > > offset?
> > > > > > > > > The
> > > > > > > > > > > idea
> > > > > > > > > > > > >    is for the consumer to say when it has consumed
> > > > > something
> > > > > > so
> > > > > > > > it
> > > > > > > > > > can
> > > > > > > > > > > be
> > > > > > > > > > > > >    deleted, right? It seems like offset would be a
> > much
> > > > > more
> > > > > > > > > precise
> > > > > > > > > > > way
> > > > > > > > > > > > > to do
> > > > > > > > > > > > >    this--i.e. the consumer says "I have
> checkpointed
> > > > state
> > > > > up
> > > > > > > to
> > > > > > > > > > > offset X
> > > > > > > > > > > > > you
> > > > > > > > > > > > >    can get rid of anything prior to that". Doing
> this
> > > by
> > > > > > > > timestamp
> > > > > > > > > > > seems
> > > > > > > > > > > > > like
> > > > > > > > > > > > >    it is just more error prone...
> > > > > > > > > > > > >    2. Is this mechanism practical to use at scale?
> It
> > > > > > requires
> > > > > > > > > > several
> > > > > > > > > > > ZK
> > > > > > > > > > > > >    writes per config change, so I guess that
> depends
> > on
> > > > how
> > > > > > > > > > frequently
> > > > > > > > > > > > the
> > > > > > > > > > > > >    consumers would update the value and how many
> > > > consumers
> > > > > > > there
> > > > > > > > > > > > are...any
> > > > > > > > > > > > >    thoughts on this?
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Jay
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > > > > wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now
> > that
> > > > > KIP-33
> > > > > > > has
> > > > > > > > > > been
> > > > > > > > > > > > > > accepted and is in-progress.  I've updated the
> KIP
> > (
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > > ).
> > > > > > > > > > > > > > I have a commit with the functionality for KIP-47
> > > ready
> > > > > to
> > > > > > go
> > > > > > > > > once
> > > > > > > > > > > > KIP-33
> > > > > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > > > > gwen@confluent.io
> <javascript:_e(%7B%7D,'cvml','gwen@confluent.io');>>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Do you mind updating the KIP with  time formats
> > we
> > > > plan
> > > > > > on
> > > > > > > > > > > supporting
> > > > > > > > > > > > > > > in the configuration?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > > > > wdwarshaw@gmail.com
> <javascript:_e(%7B%7D,'cvml','wdwarshaw@gmail.com');>
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > > >
> > > >
> > >
> >
>



-- 
Sent from Gmail Mobile

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Dong Lin <li...@gmail.com>.
Hey Bill,

I have some follow up questions after Jun's questions:

- It seems that the consumer will need to write log.retention.min.timestamp
periodically to zookeeper as dynamic configuration of the topic, so that
broker can pick up log.retention.min.timestamp. However, this introduces
dependency of consumer on zookeeper which is undesirable. Note that we have
done work to reduce dependency of client on zookeeper by moving storage of
committed offset from zookeeper into Kafka broker. Therefore, should this
KIP create a new request for client to tell broker of the
log.retention.min.timestamp?

- I share Jun's concern with timestamp. This means that we can not tell
user "only messages with timestamp smaller than log.retention.min.timestamp
will be deleted". It is important that we explicitly tell user the
expectation of any new API that we provide, in this case the use of
log.retention.min.timestamp. Then what expectation can we provide to user,
e.g. "only messages with timestamp smaller than
(log.retention.min.timestamp - 1 hour) will be deleted"?

- According to the motivation section, the use-case of the KIP is to allow
Kafka to delete messages only after client application determine it is no
longer needed. Then the KIP suggests to provide log.retention.min.timestamp
so that Kafka broker will delete messages older than
log.retention.min.timestamp. However, it is not clear how client
application can set the log.retention.min.timestamp to address the
use-case. For example, if there are more than one consumer in the consumer
group, which consumer(s) write log.retention.min.timestamp to zookeeper?
How does consumer determine the value of log.retention.min.timestamp?

- I also have concern with safety of allowing client to determine when
messages can be deleted. It looks like authentication is not required for
this KIP. And a malicious or misconfigured client can easily delete all
messages of any topic. How do we address this problem so that operator
won't have to worry about this?

BTW, I like Jun's solution of using offsets and IMO it works. Jun's
solution would also address some problems above. Some ideas discussed in
the thread of KIP-68 may help address some problems above.

Dong

On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw <wd...@gmail.com> wrote:

> Hi Jun,
>
> Those are valid concerns.  For our particular use case, application events
> triggering the timestamp update will never occur more than once an hour,
> and we maintain a sliding window so that we don't delete messages too close
> to what our consumers may be reading.
> For more general use cases, developers will need to be aware of these
> issues, and would need to write their application code with that in mind.
>
>
> To your second point: I initially wanted to just have a trim() admin api.
> I started implementing it, but ran into difficulties with synchronously
> acknowledging to the calling code that all brokers had truncated the given
> partitions.  It seemed like we would have to do something similar to how
> topic deletion is implemented, where the initial broker uses Zookeeper to
> coordinate the deletion on the other brokers.  If you have a simpler idea
> in mind, I'd be happy to update this KIP to provide a trim() api instead.
>
> On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Bill,
> >
> > Thanks for the proposal. Sorry for the late reply.
> >
> > The motivation of the proposal makes sense: don't delete the messages
> until
> > the application tells you so.
> >
> > I am wondering if the current proposal is the best way to address the
> need
> > though. There are couple of issues that I saw with the proposal. (1)
> > Messages in the log may not always be stored in increasing timestamp
> order.
> > Suppose that the application sets log.retention.min.timestamp to T and
> > after that messages with timestamp older than T ((either due to delay or
> > reprocessing) are published to that topic. Those newly published messages
> > are likely going to be deleted immediately before the application gets a
> > chance to read them, which is probably not what the application wants.
> (2)
> > The configuration for the topic has to be changed continuously to
> implement
> > the use case. Intuitively, one probably shouldn't be changing a
> > configuration all the time.
> >
> > Another way to achieve the goal is what Jay mentioned earlier. We could
> add
> > a trim() api like the following that will trim the log up to the
> specified
> > offsets. This addresses both of the above issues that I mentioned. Will
> > that work for you?
> >
> > void trim(Map<TopicPartition, Long> offsetsToTruncate)
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com>
> wrote:
> >
> > > Bumping for visibility.  KIP is here:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 47+-+Add+timestamp-based+log+deletion+policy
> > >
> > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com>
> > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> > deletion
> > > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > > compaction to coexist.
> > > >
> > > > They both will touch LogManager, but the change for KIP-47 is very
> > > > isolated.
> > > >
> > > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > Hi Bill,
> > > >
> > > > I would like to reason if there is any correlation between this KIP
> and
> > > > KIP-71
> > > >
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > > >
> > > > I feel they are orthogonal but would like to double check with you.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> > > > wrote:
> > > >
> > > > > I'd like to re-awaken this voting thread now that KIP-33 has
> merged.
> > > > This
> > > > > KIP is now completely unblocked.  I have a working branch off of
> > trunk
> > > > with
> > > > > my proposed fix, including testing.
> > > > >
> > > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Jay, Bill:
> > > > > >
> > > > > > I'm thinking of one general use case of using timestamp rather
> than
> > > > > offset
> > > > > > for log deletion, which is that for expiration handling in data
> > > > > > replication, when the source data store decides to expire some
> data
> > > > > records
> > > > > > based on their timestamps, today we need to configure the
> > > corresponding
> > > > > > Kafka changelog topic for compaction, and actively send a
> tombstone
> > > for
> > > > > > each expired record. Since expiration usually happens with a
> bunch
> > of
> > > > > > records, this could generate large tombstone traffic. For
> example I
> > > > think
> > > > > > LI's data replication for Espresso is seeing similar issues and
> > they
> > > > are
> > > > > > just not sending tombstone at all.
> > > > > >
> > > > > > With timestamp based log deletion policy, this can be easily
> > handled
> > > by
> > > > > > simply setting the current expiration timestamp; but ideally one
> > > would
> > > > > > prefer to configure this topic to be both log compaction enabled
> as
> > > > well
> > > > > as
> > > > > > log deletion enabled. From that point of view, I feel that
> current
> > > KIP
> > > > > > still has value to be accepted.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <
> wdwarshaw@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Yes, I'd agree that offset is a more precise configuration than
> > > > > > timestamp.
> > > > > > > If there was a way to set a partition-level configuration, I
> > would
> > > > > rather
> > > > > > > use log.retention.min.offset than timestamp.  If you have an
> > > approach
> > > > > in
> > > > > > > mind I'd be open to investigating it.
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > > Gotcha, good point. But barring that limitation, you agree
> that
> > > > that
> > > > > > > makes
> > > > > > > > more sense?
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> > > wdwarshaw@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > The problem with offset as a config option is that offsets
> > are
> > > > > > > > > partition-specific, so we'd need a per-partition config.
> > This
> > > > > would
> > > > > > > work
> > > > > > > > > for our particular use case, where we have single-partition
> > > > topics,
> > > > > > but
> > > > > > > > for
> > > > > > > > > multiple-partition topics it would delete from all
> partitions
> > > > based
> > > > > > on
> > > > > > > a
> > > > > > > > > global topic-level offset.
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <
> jay@confluent.io>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I think you are saying you considered a kind of trim()
> api
> > > that
> > > > > > would
> > > > > > > > > > synchronously chop off the tail of the log starting from
> a
> > > > given
> > > > > > > > offset.
> > > > > > > > > > That would be one option, but what I was saying was
> > slightly
> > > > > > > different:
> > > > > > > > > in
> > > > > > > > > > the proposal you have where there is a config that
> controls
> > > > > > retention
> > > > > > > > > that
> > > > > > > > > > the user would update, wouldn't it make more sense for
> this
> > > > > config
> > > > > > to
> > > > > > > > be
> > > > > > > > > > based on offset rather than timestamp?
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > > > wdwarshaw@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > 1.  Initially I looked at using the actual offset, by
> > > adding
> > > > a
> > > > > > call
> > > > > > > > to
> > > > > > > > > > > AdminUtils to just delete anything in a given
> > > topic/partition
> > > > > to
> > > > > > a
> > > > > > > > > given
> > > > > > > > > > > offset.  I ran into a lot of trouble here trying to
> work
> > > out
> > > > > how
> > > > > > > the
> > > > > > > > > > system
> > > > > > > > > > > would recognize that every broker had successfully
> > deleted
> > > > that
> > > > > > > range
> > > > > > > > > > from
> > > > > > > > > > > the partition before returning to the client.  If we
> were
> > > ok
> > > > > > > treating
> > > > > > > > > > this
> > > > > > > > > > > as a completely asynchronous operation I would be open
> to
> > > > > > > revisiting
> > > > > > > > > this
> > > > > > > > > > > approach.
> > > > > > > > > > >
> > > > > > > > > > > 2.  For our use case, we would be updating the config
> > every
> > > > few
> > > > > > > hours
> > > > > > > > > > for a
> > > > > > > > > > > given topic, and there would not a be a sizable amount
> of
> > > > > > > > consumers.  I
> > > > > > > > > > > imagine that this would not scale well if someone was
> > > > adjusting
> > > > > > > this
> > > > > > > > > > config
> > > > > > > > > > > very frequently on a large system, but I don't know if
> > > there
> > > > > are
> > > > > > > any
> > > > > > > > > use
> > > > > > > > > > > cases where that would occur.  I imagine most use cases
> > > would
> > > > > > > involve
> > > > > > > > > > > truncating the log after taking a snapshot or doing
> some
> > > > other
> > > > > > > > > expensive
> > > > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> > > jay@confluent.io>
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Two comments:
> > > > > > > > > > > >
> > > > > > > > > > > >    1. Is there a reason to use physical time rather
> > than
> > > > > > offset?
> > > > > > > > The
> > > > > > > > > > idea
> > > > > > > > > > > >    is for the consumer to say when it has consumed
> > > > something
> > > > > so
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > > >    deleted, right? It seems like offset would be a
> much
> > > > more
> > > > > > > > precise
> > > > > > > > > > way
> > > > > > > > > > > > to do
> > > > > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> > > state
> > > > up
> > > > > > to
> > > > > > > > > > offset X
> > > > > > > > > > > > you
> > > > > > > > > > > >    can get rid of anything prior to that". Doing this
> > by
> > > > > > > timestamp
> > > > > > > > > > seems
> > > > > > > > > > > > like
> > > > > > > > > > > >    it is just more error prone...
> > > > > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > > > > requires
> > > > > > > > > several
> > > > > > > > > > ZK
> > > > > > > > > > > >    writes per config change, so I guess that depends
> on
> > > how
> > > > > > > > > frequently
> > > > > > > > > > > the
> > > > > > > > > > > >    consumers would update the value and how many
> > > consumers
> > > > > > there
> > > > > > > > > > > are...any
> > > > > > > > > > > >    thoughts on this?
> > > > > > > > > > > >
> > > > > > > > > > > > -Jay
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > > > wdwarshaw@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now
> that
> > > > KIP-33
> > > > > > has
> > > > > > > > > been
> > > > > > > > > > > > > accepted and is in-progress.  I've updated the KIP
> (
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > ).
> > > > > > > > > > > > > I have a commit with the functionality for KIP-47
> > ready
> > > > to
> > > > > go
> > > > > > > > once
> > > > > > > > > > > KIP-33
> > > > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > > > gwen@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Do you mind updating the KIP with  time formats
> we
> > > plan
> > > > > on
> > > > > > > > > > supporting
> > > > > > > > > > > > > > in the configuration?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > > > wdwarshaw@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Bill Warshaw <wd...@gmail.com>.
Hi Jun,

Those are valid concerns.  For our particular use case, application events
triggering the timestamp update will never occur more than once an hour,
and we maintain a sliding window so that we don't delete messages too close
to what our consumers may be reading.
For more general use cases, developers will need to be aware of these
issues, and would need to write their application code with that in mind.


To your second point: I initially wanted to just have a trim() admin api.
I started implementing it, but ran into difficulties with synchronously
acknowledging to the calling code that all brokers had truncated the given
partitions.  It seemed like we would have to do something similar to how
topic deletion is implemented, where the initial broker uses Zookeeper to
coordinate the deletion on the other brokers.  If you have a simpler idea
in mind, I'd be happy to update this KIP to provide a trim() api instead.

On Mon, Oct 24, 2016 at 8:15 PM Jun Rao <ju...@confluent.io> wrote:

> Hi, Bill,
>
> Thanks for the proposal. Sorry for the late reply.
>
> The motivation of the proposal makes sense: don't delete the messages until
> the application tells you so.
>
> I am wondering if the current proposal is the best way to address the need
> though. There are couple of issues that I saw with the proposal. (1)
> Messages in the log may not always be stored in increasing timestamp order.
> Suppose that the application sets log.retention.min.timestamp to T and
> after that messages with timestamp older than T ((either due to delay or
> reprocessing) are published to that topic. Those newly published messages
> are likely going to be deleted immediately before the application gets a
> chance to read them, which is probably not what the application wants. (2)
> The configuration for the topic has to be changed continuously to implement
> the use case. Intuitively, one probably shouldn't be changing a
> configuration all the time.
>
> Another way to achieve the goal is what Jay mentioned earlier. We could add
> a trim() api like the following that will trim the log up to the specified
> offsets. This addresses both of the above issues that I mentioned. Will
> that work for you?
>
> void trim(Map<TopicPartition, Long> offsetsToTruncate)
>
> Thanks,
>
> Jun
>
> On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com> wrote:
>
> > Bumping for visibility.  KIP is here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 47+-+Add+timestamp-based+log+deletion+policy
> >
> > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> deletion
> > > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > > compaction to coexist.
> > >
> > > They both will touch LogManager, but the change for KIP-47 is very
> > > isolated.
> > >
> > > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > Hi Bill,
> > >
> > > I would like to reason if there is any correlation between this KIP and
> > > KIP-71
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > >
> > > I feel they are orthogonal but would like to double check with you.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> > > wrote:
> > >
> > > > I'd like to re-awaken this voting thread now that KIP-33 has merged.
> > > This
> > > > KIP is now completely unblocked.  I have a working branch off of
> trunk
> > > with
> > > > my proposed fix, including testing.
> > > >
> > > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > > >
> > > > > Jay, Bill:
> > > > >
> > > > > I'm thinking of one general use case of using timestamp rather than
> > > > offset
> > > > > for log deletion, which is that for expiration handling in data
> > > > > replication, when the source data store decides to expire some data
> > > > records
> > > > > based on their timestamps, today we need to configure the
> > corresponding
> > > > > Kafka changelog topic for compaction, and actively send a tombstone
> > for
> > > > > each expired record. Since expiration usually happens with a bunch
> of
> > > > > records, this could generate large tombstone traffic. For example I
> > > think
> > > > > LI's data replication for Espresso is seeing similar issues and
> they
> > > are
> > > > > just not sending tombstone at all.
> > > > >
> > > > > With timestamp based log deletion policy, this can be easily
> handled
> > by
> > > > > simply setting the current expiration timestamp; but ideally one
> > would
> > > > > prefer to configure this topic to be both log compaction enabled as
> > > well
> > > > as
> > > > > log deletion enabled. From that point of view, I feel that current
> > KIP
> > > > > still has value to be accepted.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <wd...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Yes, I'd agree that offset is a more precise configuration than
> > > > > timestamp.
> > > > > > If there was a way to set a partition-level configuration, I
> would
> > > > rather
> > > > > > use log.retention.min.offset than timestamp.  If you have an
> > approach
> > > > in
> > > > > > mind I'd be open to investigating it.
> > > > > >
> > > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Gotcha, good point. But barring that limitation, you agree that
> > > that
> > > > > > makes
> > > > > > > more sense?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> > wdwarshaw@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > The problem with offset as a config option is that offsets
> are
> > > > > > > > partition-specific, so we'd need a per-partition config.
> This
> > > > would
> > > > > > work
> > > > > > > > for our particular use case, where we have single-partition
> > > topics,
> > > > > but
> > > > > > > for
> > > > > > > > multiple-partition topics it would delete from all partitions
> > > based
> > > > > on
> > > > > > a
> > > > > > > > global topic-level offset.
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <ja...@confluent.io>
> > > > wrote:
> > > > > > > >
> > > > > > > > > I think you are saying you considered a kind of trim() api
> > that
> > > > > would
> > > > > > > > > synchronously chop off the tail of the log starting from a
> > > given
> > > > > > > offset.
> > > > > > > > > That would be one option, but what I was saying was
> slightly
> > > > > > different:
> > > > > > > > in
> > > > > > > > > the proposal you have where there is a config that controls
> > > > > retention
> > > > > > > > that
> > > > > > > > > the user would update, wouldn't it make more sense for this
> > > > config
> > > > > to
> > > > > > > be
> > > > > > > > > based on offset rather than timestamp?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > > wdwarshaw@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > 1.  Initially I looked at using the actual offset, by
> > adding
> > > a
> > > > > call
> > > > > > > to
> > > > > > > > > > AdminUtils to just delete anything in a given
> > topic/partition
> > > > to
> > > > > a
> > > > > > > > given
> > > > > > > > > > offset.  I ran into a lot of trouble here trying to work
> > out
> > > > how
> > > > > > the
> > > > > > > > > system
> > > > > > > > > > would recognize that every broker had successfully
> deleted
> > > that
> > > > > > range
> > > > > > > > > from
> > > > > > > > > > the partition before returning to the client.  If we were
> > ok
> > > > > > treating
> > > > > > > > > this
> > > > > > > > > > as a completely asynchronous operation I would be open to
> > > > > > revisiting
> > > > > > > > this
> > > > > > > > > > approach.
> > > > > > > > > >
> > > > > > > > > > 2.  For our use case, we would be updating the config
> every
> > > few
> > > > > > hours
> > > > > > > > > for a
> > > > > > > > > > given topic, and there would not a be a sizable amount of
> > > > > > > consumers.  I
> > > > > > > > > > imagine that this would not scale well if someone was
> > > adjusting
> > > > > > this
> > > > > > > > > config
> > > > > > > > > > very frequently on a large system, but I don't know if
> > there
> > > > are
> > > > > > any
> > > > > > > > use
> > > > > > > > > > cases where that would occur.  I imagine most use cases
> > would
> > > > > > involve
> > > > > > > > > > truncating the log after taking a snapshot or doing some
> > > other
> > > > > > > > expensive
> > > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > > >
> > > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> > jay@confluent.io>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Two comments:
> > > > > > > > > > >
> > > > > > > > > > >    1. Is there a reason to use physical time rather
> than
> > > > > offset?
> > > > > > > The
> > > > > > > > > idea
> > > > > > > > > > >    is for the consumer to say when it has consumed
> > > something
> > > > so
> > > > > > it
> > > > > > > > can
> > > > > > > > > be
> > > > > > > > > > >    deleted, right? It seems like offset would be a much
> > > more
> > > > > > > precise
> > > > > > > > > way
> > > > > > > > > > > to do
> > > > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> > state
> > > up
> > > > > to
> > > > > > > > > offset X
> > > > > > > > > > > you
> > > > > > > > > > >    can get rid of anything prior to that". Doing this
> by
> > > > > > timestamp
> > > > > > > > > seems
> > > > > > > > > > > like
> > > > > > > > > > >    it is just more error prone...
> > > > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > > > requires
> > > > > > > > several
> > > > > > > > > ZK
> > > > > > > > > > >    writes per config change, so I guess that depends on
> > how
> > > > > > > > frequently
> > > > > > > > > > the
> > > > > > > > > > >    consumers would update the value and how many
> > consumers
> > > > > there
> > > > > > > > > > are...any
> > > > > > > > > > >    thoughts on this?
> > > > > > > > > > >
> > > > > > > > > > > -Jay
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > > wdwarshaw@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now that
> > > KIP-33
> > > > > has
> > > > > > > > been
> > > > > > > > > > > > accepted and is in-progress.  I've updated the KIP (
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > ).
> > > > > > > > > > > > I have a commit with the functionality for KIP-47
> ready
> > > to
> > > > go
> > > > > > > once
> > > > > > > > > > KIP-33
> > > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > > gwen@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > > >
> > > > > > > > > > > > > Do you mind updating the KIP with  time formats we
> > plan
> > > > on
> > > > > > > > > supporting
> > > > > > > > > > > > > in the configuration?
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > > wdwarshaw@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> >
>

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Jun Rao <ju...@confluent.io>.
Hi, Bill,

Thanks for the proposal. Sorry for the late reply.

The motivation of the proposal makes sense: don't delete the messages until
the application tells you so.

I am wondering if the current proposal is the best way to address the need
though. There are couple of issues that I saw with the proposal. (1)
Messages in the log may not always be stored in increasing timestamp order.
Suppose that the application sets log.retention.min.timestamp to T and
after that messages with timestamp older than T ((either due to delay or
reprocessing) are published to that topic. Those newly published messages
are likely going to be deleted immediately before the application gets a
chance to read them, which is probably not what the application wants. (2)
The configuration for the topic has to be changed continuously to implement
the use case. Intuitively, one probably shouldn't be changing a
configuration all the time.

Another way to achieve the goal is what Jay mentioned earlier. We could add
a trim() api like the following that will trim the log up to the specified
offsets. This addresses both of the above issues that I mentioned. Will
that work for you?

void trim(Map<TopicPartition, Long> offsetsToTruncate)

Thanks,

Jun

On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com> wrote:

> Bumping for visibility.  KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 47+-+Add+timestamp-based+log+deletion+policy
>
> On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com> wrote:
>
> > Hello Guozhang,
> >
> > KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new deletion
> > policy (minimum timestamp), while KIP-71 is allowing deletion and
> > compaction to coexist.
> >
> > They both will touch LogManager, but the change for KIP-47 is very
> > isolated.
> >
> > On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > Hi Bill,
> >
> > I would like to reason if there is any correlation between this KIP and
> > KIP-71
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> >
> > I feel they are orthogonal but would like to double check with you.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> > wrote:
> >
> > > I'd like to re-awaken this voting thread now that KIP-33 has merged.
> > This
> > > KIP is now completely unblocked.  I have a working branch off of trunk
> > with
> > > my proposed fix, including testing.
> > >
> > > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > > > Jay, Bill:
> > > >
> > > > I'm thinking of one general use case of using timestamp rather than
> > > offset
> > > > for log deletion, which is that for expiration handling in data
> > > > replication, when the source data store decides to expire some data
> > > records
> > > > based on their timestamps, today we need to configure the
> corresponding
> > > > Kafka changelog topic for compaction, and actively send a tombstone
> for
> > > > each expired record. Since expiration usually happens with a bunch of
> > > > records, this could generate large tombstone traffic. For example I
> > think
> > > > LI's data replication for Espresso is seeing similar issues and they
> > are
> > > > just not sending tombstone at all.
> > > >
> > > > With timestamp based log deletion policy, this can be easily handled
> by
> > > > simply setting the current expiration timestamp; but ideally one
> would
> > > > prefer to configure this topic to be both log compaction enabled as
> > well
> > > as
> > > > log deletion enabled. From that point of view, I feel that current
> KIP
> > > > still has value to be accepted.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <wd...@gmail.com>
> > > wrote:
> > > >
> > > > > Yes, I'd agree that offset is a more precise configuration than
> > > > timestamp.
> > > > > If there was a way to set a partition-level configuration, I would
> > > rather
> > > > > use log.retention.min.offset than timestamp.  If you have an
> approach
> > > in
> > > > > mind I'd be open to investigating it.
> > > > >
> > > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io>
> wrote:
> > > > >
> > > > > > Gotcha, good point. But barring that limitation, you agree that
> > that
> > > > > makes
> > > > > > more sense?
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> wdwarshaw@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > The problem with offset as a config option is that offsets are
> > > > > > > partition-specific, so we'd need a per-partition config.  This
> > > would
> > > > > work
> > > > > > > for our particular use case, where we have single-partition
> > topics,
> > > > but
> > > > > > for
> > > > > > > multiple-partition topics it would delete from all partitions
> > based
> > > > on
> > > > > a
> > > > > > > global topic-level offset.
> > > > > > >
> > > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <ja...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > > I think you are saying you considered a kind of trim() api
> that
> > > > would
> > > > > > > > synchronously chop off the tail of the log starting from a
> > given
> > > > > > offset.
> > > > > > > > That would be one option, but what I was saying was slightly
> > > > > different:
> > > > > > > in
> > > > > > > > the proposal you have where there is a config that controls
> > > > retention
> > > > > > > that
> > > > > > > > the user would update, wouldn't it make more sense for this
> > > config
> > > > to
> > > > > > be
> > > > > > > > based on offset rather than timestamp?
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> > > wdwarshaw@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > 1.  Initially I looked at using the actual offset, by
> adding
> > a
> > > > call
> > > > > > to
> > > > > > > > > AdminUtils to just delete anything in a given
> topic/partition
> > > to
> > > > a
> > > > > > > given
> > > > > > > > > offset.  I ran into a lot of trouble here trying to work
> out
> > > how
> > > > > the
> > > > > > > > system
> > > > > > > > > would recognize that every broker had successfully deleted
> > that
> > > > > range
> > > > > > > > from
> > > > > > > > > the partition before returning to the client.  If we were
> ok
> > > > > treating
> > > > > > > > this
> > > > > > > > > as a completely asynchronous operation I would be open to
> > > > > revisiting
> > > > > > > this
> > > > > > > > > approach.
> > > > > > > > >
> > > > > > > > > 2.  For our use case, we would be updating the config every
> > few
> > > > > hours
> > > > > > > > for a
> > > > > > > > > given topic, and there would not a be a sizable amount of
> > > > > > consumers.  I
> > > > > > > > > imagine that this would not scale well if someone was
> > adjusting
> > > > > this
> > > > > > > > config
> > > > > > > > > very frequently on a large system, but I don't know if
> there
> > > are
> > > > > any
> > > > > > > use
> > > > > > > > > cases where that would occur.  I imagine most use cases
> would
> > > > > involve
> > > > > > > > > truncating the log after taking a snapshot or doing some
> > other
> > > > > > > expensive
> > > > > > > > > operation that didn't occur very frequently.
> > > > > > > > >
> > > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> jay@confluent.io>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Two comments:
> > > > > > > > > >
> > > > > > > > > >    1. Is there a reason to use physical time rather than
> > > > offset?
> > > > > > The
> > > > > > > > idea
> > > > > > > > > >    is for the consumer to say when it has consumed
> > something
> > > so
> > > > > it
> > > > > > > can
> > > > > > > > be
> > > > > > > > > >    deleted, right? It seems like offset would be a much
> > more
> > > > > > precise
> > > > > > > > way
> > > > > > > > > > to do
> > > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> state
> > up
> > > > to
> > > > > > > > offset X
> > > > > > > > > > you
> > > > > > > > > >    can get rid of anything prior to that". Doing this by
> > > > > timestamp
> > > > > > > > seems
> > > > > > > > > > like
> > > > > > > > > >    it is just more error prone...
> > > > > > > > > >    2. Is this mechanism practical to use at scale? It
> > > requires
> > > > > > > several
> > > > > > > > ZK
> > > > > > > > > >    writes per config change, so I guess that depends on
> how
> > > > > > > frequently
> > > > > > > > > the
> > > > > > > > > >    consumers would update the value and how many
> consumers
> > > > there
> > > > > > > > > are...any
> > > > > > > > > >    thoughts on this?
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> > > > > wdwarshaw@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now that
> > KIP-33
> > > > has
> > > > > > > been
> > > > > > > > > > > accepted and is in-progress.  I've updated the KIP (
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > ).
> > > > > > > > > > > I have a commit with the functionality for KIP-47 ready
> > to
> > > go
> > > > > > once
> > > > > > > > > KIP-33
> > > > > > > > > > > is complete; it's a fairly minor change.
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> > > > > gwen@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > For convenience, the KIP is here:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 47+-+Add+timestamp-based+log+deletion+policy
> > > > > > > > > > > >
> > > > > > > > > > > > Do you mind updating the KIP with  time formats we
> plan
> > > on
> > > > > > > > supporting
> > > > > > > > > > > > in the configuration?
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> > > > > > > wdwarshaw@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > Hello,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Bill Warshaw
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
>

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Guozhang Wang <wa...@gmail.com>.
+1.

On Fri, Oct 7, 2016 at 3:35 PM, Gwen Shapira <gw...@confluent.io> wrote:

> +1 (binding)
>
> On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com> wrote:
> > Bumping for visibility.  KIP is here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-47+-+
> Add+timestamp-based+log+deletion+policy
> >
> > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com>
> wrote:
> >
> >> Hello Guozhang,
> >>
> >> KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new
> deletion
> >> policy (minimum timestamp), while KIP-71 is allowing deletion and
> >> compaction to coexist.
> >>
> >> They both will touch LogManager, but the change for KIP-47 is very
> >> isolated.
> >>
> >> On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >> Hi Bill,
> >>
> >> I would like to reason if there is any correlation between this KIP and
> >> KIP-71
> >>
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+
> Enable+log+compaction+and+deletion+to+co-exist
> >>
> >> I feel they are orthogonal but would like to double check with you.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
> >> wrote:
> >>
> >> > I'd like to re-awaken this voting thread now that KIP-33 has merged.
> >> This
> >> > KIP is now completely unblocked.  I have a working branch off of trunk
> >> with
> >> > my proposed fix, including testing.
> >> >
> >> > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >> >
> >> > > Jay, Bill:
> >> > >
> >> > > I'm thinking of one general use case of using timestamp rather than
> >> > offset
> >> > > for log deletion, which is that for expiration handling in data
> >> > > replication, when the source data store decides to expire some data
> >> > records
> >> > > based on their timestamps, today we need to configure the
> corresponding
> >> > > Kafka changelog topic for compaction, and actively send a tombstone
> for
> >> > > each expired record. Since expiration usually happens with a bunch
> of
> >> > > records, this could generate large tombstone traffic. For example I
> >> think
> >> > > LI's data replication for Espresso is seeing similar issues and they
> >> are
> >> > > just not sending tombstone at all.
> >> > >
> >> > > With timestamp based log deletion policy, this can be easily
> handled by
> >> > > simply setting the current expiration timestamp; but ideally one
> would
> >> > > prefer to configure this topic to be both log compaction enabled as
> >> well
> >> > as
> >> > > log deletion enabled. From that point of view, I feel that current
> KIP
> >> > > still has value to be accepted.
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <wd...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Yes, I'd agree that offset is a more precise configuration than
> >> > > timestamp.
> >> > > > If there was a way to set a partition-level configuration, I would
> >> > rather
> >> > > > use log.retention.min.offset than timestamp.  If you have an
> approach
> >> > in
> >> > > > mind I'd be open to investigating it.
> >> > > >
> >> > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io>
> wrote:
> >> > > >
> >> > > > > Gotcha, good point. But barring that limitation, you agree that
> >> that
> >> > > > makes
> >> > > > > more sense?
> >> > > > >
> >> > > > > -Jay
> >> > > > >
> >> > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <
> wdwarshaw@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > The problem with offset as a config option is that offsets are
> >> > > > > > partition-specific, so we'd need a per-partition config.  This
> >> > would
> >> > > > work
> >> > > > > > for our particular use case, where we have single-partition
> >> topics,
> >> > > but
> >> > > > > for
> >> > > > > > multiple-partition topics it would delete from all partitions
> >> based
> >> > > on
> >> > > > a
> >> > > > > > global topic-level offset.
> >> > > > > >
> >> > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <ja...@confluent.io>
> >> > wrote:
> >> > > > > >
> >> > > > > > > I think you are saying you considered a kind of trim() api
> that
> >> > > would
> >> > > > > > > synchronously chop off the tail of the log starting from a
> >> given
> >> > > > > offset.
> >> > > > > > > That would be one option, but what I was saying was slightly
> >> > > > different:
> >> > > > > > in
> >> > > > > > > the proposal you have where there is a config that controls
> >> > > retention
> >> > > > > > that
> >> > > > > > > the user would update, wouldn't it make more sense for this
> >> > config
> >> > > to
> >> > > > > be
> >> > > > > > > based on offset rather than timestamp?
> >> > > > > > >
> >> > > > > > > -Jay
> >> > > > > > >
> >> > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
> >> > wdwarshaw@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > 1.  Initially I looked at using the actual offset, by
> adding
> >> a
> >> > > call
> >> > > > > to
> >> > > > > > > > AdminUtils to just delete anything in a given
> topic/partition
> >> > to
> >> > > a
> >> > > > > > given
> >> > > > > > > > offset.  I ran into a lot of trouble here trying to work
> out
> >> > how
> >> > > > the
> >> > > > > > > system
> >> > > > > > > > would recognize that every broker had successfully deleted
> >> that
> >> > > > range
> >> > > > > > > from
> >> > > > > > > > the partition before returning to the client.  If we were
> ok
> >> > > > treating
> >> > > > > > > this
> >> > > > > > > > as a completely asynchronous operation I would be open to
> >> > > > revisiting
> >> > > > > > this
> >> > > > > > > > approach.
> >> > > > > > > >
> >> > > > > > > > 2.  For our use case, we would be updating the config
> every
> >> few
> >> > > > hours
> >> > > > > > > for a
> >> > > > > > > > given topic, and there would not a be a sizable amount of
> >> > > > > consumers.  I
> >> > > > > > > > imagine that this would not scale well if someone was
> >> adjusting
> >> > > > this
> >> > > > > > > config
> >> > > > > > > > very frequently on a large system, but I don't know if
> there
> >> > are
> >> > > > any
> >> > > > > > use
> >> > > > > > > > cases where that would occur.  I imagine most use cases
> would
> >> > > > involve
> >> > > > > > > > truncating the log after taking a snapshot or doing some
> >> other
> >> > > > > > expensive
> >> > > > > > > > operation that didn't occur very frequently.
> >> > > > > > > >
> >> > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <
> jay@confluent.io>
> >> > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Two comments:
> >> > > > > > > > >
> >> > > > > > > > >    1. Is there a reason to use physical time rather than
> >> > > offset?
> >> > > > > The
> >> > > > > > > idea
> >> > > > > > > > >    is for the consumer to say when it has consumed
> >> something
> >> > so
> >> > > > it
> >> > > > > > can
> >> > > > > > > be
> >> > > > > > > > >    deleted, right? It seems like offset would be a much
> >> more
> >> > > > > precise
> >> > > > > > > way
> >> > > > > > > > > to do
> >> > > > > > > > >    this--i.e. the consumer says "I have checkpointed
> state
> >> up
> >> > > to
> >> > > > > > > offset X
> >> > > > > > > > > you
> >> > > > > > > > >    can get rid of anything prior to that". Doing this by
> >> > > > timestamp
> >> > > > > > > seems
> >> > > > > > > > > like
> >> > > > > > > > >    it is just more error prone...
> >> > > > > > > > >    2. Is this mechanism practical to use at scale? It
> >> > requires
> >> > > > > > several
> >> > > > > > > ZK
> >> > > > > > > > >    writes per config change, so I guess that depends on
> how
> >> > > > > > frequently
> >> > > > > > > > the
> >> > > > > > > > >    consumers would update the value and how many
> consumers
> >> > > there
> >> > > > > > > > are...any
> >> > > > > > > > >    thoughts on this?
> >> > > > > > > > >
> >> > > > > > > > > -Jay
> >> > > > > > > > >
> >> > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
> >> > > > wdwarshaw@gmail.com
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now that
> >> KIP-33
> >> > > has
> >> > > > > > been
> >> > > > > > > > > > accepted and is in-progress.  I've updated the KIP (
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 47+-+Add+timestamp-based+log+deletion+policy
> >> > > > > > > > > > ).
> >> > > > > > > > > > I have a commit with the functionality for KIP-47
> ready
> >> to
> >> > go
> >> > > > > once
> >> > > > > > > > KIP-33
> >> > > > > > > > > > is complete; it's a fairly minor change.
> >> > > > > > > > > >
> >> > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
> >> > > > gwen@confluent.io>
> >> > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > For convenience, the KIP is here:
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 47+-+Add+timestamp-based+log+deletion+policy
> >> > > > > > > > > > >
> >> > > > > > > > > > > Do you mind updating the KIP with  time formats we
> plan
> >> > on
> >> > > > > > > supporting
> >> > > > > > > > > > > in the configuration?
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
> >> > > > > > wdwarshaw@gmail.com
> >> > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > > Hello,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Thanks,
> >> > > > > > > > > > > > Bill Warshaw
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >>
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>



-- 
-- Guozhang

Re: [VOTE] KIP-47 - Add timestamp-based log deletion policy

Posted by Gwen Shapira <gw...@confluent.io>.
+1 (binding)

On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw <wd...@gmail.com> wrote:
> Bumping for visibility.  KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-47+-+Add+timestamp-based+log+deletion+policy
>
> On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw <wd...@gmail.com> wrote:
>
>> Hello Guozhang,
>>
>> KIP-71 seems unrelated to this KIP.  KIP-47 is just adding a new deletion
>> policy (minimum timestamp), while KIP-71 is allowing deletion and
>> compaction to coexist.
>>
>> They both will touch LogManager, but the change for KIP-47 is very
>> isolated.
>>
>> On Wed, Aug 24, 2016 at 2:21 PM Guozhang Wang <wa...@gmail.com> wrote:
>>
>> Hi Bill,
>>
>> I would like to reason if there is any correlation between this KIP and
>> KIP-71
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
>>
>> I feel they are orthogonal but would like to double check with you.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Aug 24, 2016 at 11:05 AM, Bill Warshaw <wd...@gmail.com>
>> wrote:
>>
>> > I'd like to re-awaken this voting thread now that KIP-33 has merged.
>> This
>> > KIP is now completely unblocked.  I have a working branch off of trunk
>> with
>> > my proposed fix, including testing.
>> >
>> > On Mon, May 9, 2016 at 8:30 PM Guozhang Wang <wa...@gmail.com> wrote:
>> >
>> > > Jay, Bill:
>> > >
>> > > I'm thinking of one general use case of using timestamp rather than
>> > offset
>> > > for log deletion, which is that for expiration handling in data
>> > > replication, when the source data store decides to expire some data
>> > records
>> > > based on their timestamps, today we need to configure the corresponding
>> > > Kafka changelog topic for compaction, and actively send a tombstone for
>> > > each expired record. Since expiration usually happens with a bunch of
>> > > records, this could generate large tombstone traffic. For example I
>> think
>> > > LI's data replication for Espresso is seeing similar issues and they
>> are
>> > > just not sending tombstone at all.
>> > >
>> > > With timestamp based log deletion policy, this can be easily handled by
>> > > simply setting the current expiration timestamp; but ideally one would
>> > > prefer to configure this topic to be both log compaction enabled as
>> well
>> > as
>> > > log deletion enabled. From that point of view, I feel that current KIP
>> > > still has value to be accepted.
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Mon, May 2, 2016 at 2:37 PM, Bill Warshaw <wd...@gmail.com>
>> > wrote:
>> > >
>> > > > Yes, I'd agree that offset is a more precise configuration than
>> > > timestamp.
>> > > > If there was a way to set a partition-level configuration, I would
>> > rather
>> > > > use log.retention.min.offset than timestamp.  If you have an approach
>> > in
>> > > > mind I'd be open to investigating it.
>> > > >
>> > > > On Mon, May 2, 2016 at 5:33 PM, Jay Kreps <ja...@confluent.io> wrote:
>> > > >
>> > > > > Gotcha, good point. But barring that limitation, you agree that
>> that
>> > > > makes
>> > > > > more sense?
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > > On Mon, May 2, 2016 at 2:29 PM, Bill Warshaw <wd...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > The problem with offset as a config option is that offsets are
>> > > > > > partition-specific, so we'd need a per-partition config.  This
>> > would
>> > > > work
>> > > > > > for our particular use case, where we have single-partition
>> topics,
>> > > but
>> > > > > for
>> > > > > > multiple-partition topics it would delete from all partitions
>> based
>> > > on
>> > > > a
>> > > > > > global topic-level offset.
>> > > > > >
>> > > > > > On Mon, May 2, 2016 at 4:32 PM, Jay Kreps <ja...@confluent.io>
>> > wrote:
>> > > > > >
>> > > > > > > I think you are saying you considered a kind of trim() api that
>> > > would
>> > > > > > > synchronously chop off the tail of the log starting from a
>> given
>> > > > > offset.
>> > > > > > > That would be one option, but what I was saying was slightly
>> > > > different:
>> > > > > > in
>> > > > > > > the proposal you have where there is a config that controls
>> > > retention
>> > > > > > that
>> > > > > > > the user would update, wouldn't it make more sense for this
>> > config
>> > > to
>> > > > > be
>> > > > > > > based on offset rather than timestamp?
>> > > > > > >
>> > > > > > > -Jay
>> > > > > > >
>> > > > > > > On Mon, May 2, 2016 at 12:53 PM, Bill Warshaw <
>> > wdwarshaw@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > 1.  Initially I looked at using the actual offset, by adding
>> a
>> > > call
>> > > > > to
>> > > > > > > > AdminUtils to just delete anything in a given topic/partition
>> > to
>> > > a
>> > > > > > given
>> > > > > > > > offset.  I ran into a lot of trouble here trying to work out
>> > how
>> > > > the
>> > > > > > > system
>> > > > > > > > would recognize that every broker had successfully deleted
>> that
>> > > > range
>> > > > > > > from
>> > > > > > > > the partition before returning to the client.  If we were ok
>> > > > treating
>> > > > > > > this
>> > > > > > > > as a completely asynchronous operation I would be open to
>> > > > revisiting
>> > > > > > this
>> > > > > > > > approach.
>> > > > > > > >
>> > > > > > > > 2.  For our use case, we would be updating the config every
>> few
>> > > > hours
>> > > > > > > for a
>> > > > > > > > given topic, and there would not a be a sizable amount of
>> > > > > consumers.  I
>> > > > > > > > imagine that this would not scale well if someone was
>> adjusting
>> > > > this
>> > > > > > > config
>> > > > > > > > very frequently on a large system, but I don't know if there
>> > are
>> > > > any
>> > > > > > use
>> > > > > > > > cases where that would occur.  I imagine most use cases would
>> > > > involve
>> > > > > > > > truncating the log after taking a snapshot or doing some
>> other
>> > > > > > expensive
>> > > > > > > > operation that didn't occur very frequently.
>> > > > > > > >
>> > > > > > > > On Mon, May 2, 2016 at 2:23 PM, Jay Kreps <ja...@confluent.io>
>> > > > wrote:
>> > > > > > > >
>> > > > > > > > > Two comments:
>> > > > > > > > >
>> > > > > > > > >    1. Is there a reason to use physical time rather than
>> > > offset?
>> > > > > The
>> > > > > > > idea
>> > > > > > > > >    is for the consumer to say when it has consumed
>> something
>> > so
>> > > > it
>> > > > > > can
>> > > > > > > be
>> > > > > > > > >    deleted, right? It seems like offset would be a much
>> more
>> > > > > precise
>> > > > > > > way
>> > > > > > > > > to do
>> > > > > > > > >    this--i.e. the consumer says "I have checkpointed state
>> up
>> > > to
>> > > > > > > offset X
>> > > > > > > > > you
>> > > > > > > > >    can get rid of anything prior to that". Doing this by
>> > > > timestamp
>> > > > > > > seems
>> > > > > > > > > like
>> > > > > > > > >    it is just more error prone...
>> > > > > > > > >    2. Is this mechanism practical to use at scale? It
>> > requires
>> > > > > > several
>> > > > > > > ZK
>> > > > > > > > >    writes per config change, so I guess that depends on how
>> > > > > > frequently
>> > > > > > > > the
>> > > > > > > > >    consumers would update the value and how many consumers
>> > > there
>> > > > > > > > are...any
>> > > > > > > > >    thoughts on this?
>> > > > > > > > >
>> > > > > > > > > -Jay
>> > > > > > > > >
>> > > > > > > > > On Thu, Apr 28, 2016 at 8:28 AM, Bill Warshaw <
>> > > > wdwarshaw@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > I'd like to re-initiate the vote for KIP-47 now that
>> KIP-33
>> > > has
>> > > > > > been
>> > > > > > > > > > accepted and is in-progress.  I've updated the KIP (
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 47+-+Add+timestamp-based+log+deletion+policy
>> > > > > > > > > > ).
>> > > > > > > > > > I have a commit with the functionality for KIP-47 ready
>> to
>> > go
>> > > > > once
>> > > > > > > > KIP-33
>> > > > > > > > > > is complete; it's a fairly minor change.
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Mar 9, 2016 at 8:42 PM, Gwen Shapira <
>> > > > gwen@confluent.io>
>> > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > For convenience, the KIP is here:
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 47+-+Add+timestamp-based+log+deletion+policy
>> > > > > > > > > > >
>> > > > > > > > > > > Do you mind updating the KIP with  time formats we plan
>> > on
>> > > > > > > supporting
>> > > > > > > > > > > in the configuration?
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Mar 9, 2016 at 11:44 AM, Bill Warshaw <
>> > > > > > wdwarshaw@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > > > > Hello,
>> > > > > > > > > > > >
>> > > > > > > > > > > > I'd like to initiate the vote for KIP-47.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Bill Warshaw
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog