You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2018/06/01 18:51:36 UTC

Re: [DISCUSS] KIP-280: Enhanced log compaction

Hello Luis,

Please feel free to continue on the voting process as there seems be no
further comments on this thread (I have synced with Jun and Ismael
separately offline and they are in consent with the approach to add the
fields in offset map for all cases).

We can still continue on reviewing the PR while voting on the thread so
that it can get in earlier into trunk for the next release.



Guozhang


On Mon, May 28, 2018 at 11:04 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Luis,
>
> this week is feature freeze for the upcoming 2.0 release and most people
> focus on getting their PR merged. Thus, this and the next week (until
> code freeze) KIPs for 2.1 are not a high priority for most people.
>
> Please bear with us. Thanks for your understanding.
>
>
> -Matthias
>
> On 5/28/18 5:21 AM, Luís Cabral wrote:
> >  Hi Guozhang,
> >
> > It doesn't look like there will be much feedback here.
> > Is it alright if I just update the spec back to a standardized behaviour
> and move this along?
> >
> > Cheers,Luis
> >     On Thursday, May 24, 2018, 11:20:01 AM GMT+2, Luis Cabral <
> luis_cabral@yahoo.com> wrote:
> >
> >  Hi Jun / Ismael,
> >
> > Any chance to get your opinion on this?
> > Thanks in advance!
> >
> > Regards,
> > Luís
> >
> >> On 22 May 2018, at 17:30, Guozhang Wang <wa...@gmail.com> wrote:
> >>
> >> Hello Luís,
> >>
> >> While reviewing your PR I realized my previous calculation on the memory
> >> usage was incorrect: in fact, in the current implementation, each entry
> in
> >> the memory-bounded cache is 16 (default MD5 hash digest length) + 8
> (long
> >> type) = 24 bytes, and if we add the long-typed version value it is 32
> >> bytes. I.e. each entry will be increased by 33%, not doubling.
> >>
> >> After redoing the math I'm bit leaning towards just adding this entry
> for
> >> all cases rather than treating timestamp differently with others (sorry
> for
> >> being back and forth, but I just want to make sure we've got a good
> balance
> >> between efficiency and semantics consistency). I've also chatted with
> Jun
> >> and Ismael about this (cc'ed), and maybe you guys can chime in here as
> well.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, May 22, 2018 at 6:45 AM, Luís Cabral
> <Lu...@yahoo.com.invalid>
> >> wrote:
> >>
> >>> Hi Matthias / Guozhang,
> >>>
> >>> Were the questions clarified?
> >>> Please feel free to add more feedback, otherwise it would be nice to
> move
> >>> this topic onwards 😊
> >>>
> >>> Kind Regards,
> >>> Luís Cabral
> >>>
> >>> From: Guozhang Wang
> >>> Sent: 09 May 2018 20:00
> >>> To: dev@kafka.apache.org
> >>> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> >>>
> >>> I have thought about being consistency in strategy v.s. practical
> concerns
> >>> about storage convenience to its impact on compaction effectiveness.
> >>>
> >>> The different between timestamp and the header key-value pairs is that
> for
> >>> the latter, as I mentioned before, "it is arguably out of Kafka's
> control,
> >>> and indeed users may (mistakenly) generate many records with the same
> key
> >>> and the same header value." So giving up tie breakers may result in
> very
> >>> poor compaction effectiveness when it happens, while for timestamps the
> >>> likelihood of this is considered very small.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Sun, May 6, 2018 at 8:55 PM, Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Thanks.
> >>>>
> >>>> To reverse the question: if this argument holds, why does it not apply
> >>>> to the case when the header key is used as compaction attribute?
> >>>>
> >>>> I am not against keeping both records in case timestamps are equal,
> but
> >>>> shouldn't we apply the same strategy for all cases and don't use
> offset
> >>>> as tie-breaker at all?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>> On 5/6/18 8:47 PM, Guozhang Wang wrote:
> >>>>> Hello Matthias,
> >>>>>
> >>>>> The related discussion was in the PR:
> >>>>> https://github.com/apache/kafka/pull/4822#discussion_r184588037
> >>>>>
> >>>>> The concern is that, to use offset as tie breaker we need to double
> the
> >>>>> entry size of the entry in bounded compaction cache, and hence
> largely
> >>>>> reduce the effectiveness of the compaction itself. Since with
> >>>> milliseconds
> >>>>> timestamp the scenario of ties with the same key is expected to be
> >>>> small, I
> >>>>> think it would be a reasonable tradeoff to make.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Sun, May 6, 2018 at 9:37 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I just updated myself on this KIP. One question (maybe it was
> >>> discussed
> >>>>>> and I missed it). What is the motivation to not use the offset as
> tie
> >>>>>> breaker for the "timestamp" case? Isn't this inconsistent behavior?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>> On 5/2/18 2:07 PM, Guozhang Wang wrote:
> >>>>>>> Hello Luís,
> >>>>>>>
> >>>>>>> Sorry for the late reply.
> >>>>>>>
> >>>>>>> My understanding is that such duplicates will only happen if the
> >>>>>> non-offset
> >>>>>>> version value, either the timestamp or some long-typed header key,
> >>> are
> >>>>>> the
> >>>>>>> same (i.e. we cannot break ties).
> >>>>>>>
> >>>>>>> 1. For timestamp, which is in milli-seconds, I think in practice
> the
> >>>>>>> likelihood of records with the same key and the same milli-sec
> >>>> timestamp
> >>>>>>> are very small. And hence the duplicate amount should be very
> small.
> >>>>>>>
> >>>>>>> 2. For long-typed header key, it is arguably out of Kafka's
> control,
> >>>> and
> >>>>>>> indeed users may (mistakenly) generate many records with the same
> key
> >>>> and
> >>>>>>> the same header value.
> >>>>>>>
> >>>>>>>
> >>>>>>> So I'd like to propose a counter-offer: for 1), we still use only 8
> >>>> bytes
> >>>>>>> and allows for potential duplicates due to ties; for 2) we use 16
> >>> bytes
> >>>>>> to
> >>>>>>> always break ties. The motivation for distinguishing 1) and 2), is
> >>> that
> >>>>>> my
> >>>>>>> expectation for 1) would be much common, and hence worth special
> >>>> handling
> >>>>>>> it to be more effective in cleaning. WDYT?
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, May 2, 2018 at 2:36 AM, Luís Cabral
> >>>>>> <lu...@yahoo.com.invalid>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Guozhang,
> >>>>>>>>
> >>>>>>>> Have you managed to have a look at my reply?
> >>>>>>>> How do you feel about this?
> >>>>>>>>
> >>>>>>>> Kind Regards,
> >>>>>>>> Luís Cabral
> >>>>>>>>     On Monday, April 30, 2018, 9:27:15 AM GMT+2, Luís Cabral <
> >>>>>>>> luis_cabral@yahoo.com> wrote:
> >>>>>>>>
> >>>>>>>>   Hi Guozhang,
> >>>>>>>>
> >>>>>>>> I understand the argument, but this is a hazardous compromise for
> >>>> using
> >>>>>>>> Kafka as an event store (as is my original intention).
> >>>>>>>>
> >>>>>>>> I expect to have many duplicated messages in Kafka as the overall
> >>>>>>>> architecture being used allows for the producer to re-send a fresh
> >>>>>> state of
> >>>>>>>> the backed data into Kafka.Though this scenario is not common, as
> >>> the
> >>>>>>>> intention is for Kafka to bear the weight of replaying all the
> >>> records
> >>>>>> for
> >>>>>>>> new consumers, but it will occasionally happen.
> >>>>>>>>
> >>>>>>>> As there are plenty of records which are not updated frequently,
> >>> this
> >>>>>>>> would leave the topic with a surplus of quite a few million
> >>> duplicate
> >>>>>>>> records (and increasing every time the above mentioned function is
> >>>>>> applied).
> >>>>>>>>
> >>>>>>>> I would prefer to have the temporary memory footprint of 8 bytes
> per
> >>>>>>>> record whenever the compaction is run (only when not in 'offset'
> >>>> mode),
> >>>>>>>> than allowing for the topic to run into this state.
> >>>>>>>>
> >>>>>>>> What do you think? Is this scenario too specific for me, or do you
> >>>>>> believe
> >>>>>>>> that it could happen to other clients as well?
> >>>>>>>>
> >>>>>>>> Thanks again for the continued discussion!
> >>>>>>>> Cheers,
> >>>>>>>> Luis    On Friday, April 27, 2018, 8:21:13 PM GMT+2, Guozhang
> Wang <
> >>>>>>>> wangguoz@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> Hello Luis,
> >>>>>>>>
> >>>>>>>> When the comparing the version returns `equal`, the original
> >>> proposal
> >>>>>> is to
> >>>>>>>> use the offset as the tie breaker. My previous comment is that
> >>>>>>>>
> >>>>>>>> 1) when we build the map calling `put`, if there is already an
> entry
> >>>> for
> >>>>>>>> the key, compare its stored version, and replace if the put
> record's
> >>>>>>>> version is "no smaller than" the stored record: this is because
> when
> >>>>>>>> building the map we are always going from smaller offsets to
> larger
> >>>>>> ones.
> >>>>>>>>
> >>>>>>>> 2) when making a second pass to determine if each record should be
> >>>>>> retained
> >>>>>>>> based on the map, we do not try to break the tie if the map's
> >>> returned
> >>>>>>>> version is the same but always treat it as "keep". In this case
> when
> >>>> we
> >>>>>> are
> >>>>>>>> comparing a record with itself stored in the offset map, version
> >>>>>> comparison
> >>>>>>>> would return `equals`. As I mentioned in the PR, one caveat is
> that
> >>> we
> >>>>>> may
> >>>>>>>> indeed have multiple records with the same key and the same
> version,
> >>>> but
> >>>>>>>> once a new versioned record is appended it will be deleted.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Does that make sense?
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Apr 27, 2018 at 4:20 AM, Luís Cabral
> >>>>>> <luis_cabral@yahoo.com.invalid
> >>>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I was updating the PR to match the latest decisions and noticed
> (or
> >>>>>>>>> rather, the integration tests noticed) that without storing the
> >>>> offset,
> >>>>>>>>> then the cache doesn't know when to keep the record itself.
> >>>>>>>>>
> >>>>>>>>> This is because, after the cache is populated, all the records
> are
> >>>>>>>>> compared against the stored ones, so "Record{key:A,offset:1,
> >>>>>> version:1}"
> >>>>>>>>> will compare against itself and be flagged as "don't keep", since
> >>> we
> >>>>>> only
> >>>>>>>>> compared based on the version and didn't check to see if the
> offset
> >>>> was
> >>>>>>>> the
> >>>>>>>>> same or not.
> >>>>>>>>>
> >>>>>>>>> This sort of invalidates not storing the offset in the cache,
> >>>>>>>>> unfortunately, and the binary footprint increases two-fold when
> >>>>>> "offset"
> >>>>>>>> is
> >>>>>>>>> not used as a compaction strategy.
> >>>>>>>>>
> >>>>>>>>> Guozhang: Is it ok with you if we go back on this decision and
> >>> leave
> >>>>>> the
> >>>>>>>>> offset as a tie-breaker?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Kind Regards,Luis
> >>>>>>>>>
> >>>>>>>>>   On Friday, April 27, 2018, 11:11:55 AM GMT+2, Luís Cabral
> >>>>>>>>> <lu...@yahoo.com.INVALID> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> The KIP is now updated with the results of the byte array
> >>> discussion.
> >>>>>>>>>
> >>>>>>>>> This is my first contribution to Kafka, so I'm not sure on what
> the
> >>>>>>>>> processes are. Is it now acceptable to take this into a vote, or
> >>>>>> should I
> >>>>>>>>> ask for more contributors to join the discussion first?
> >>>>>>>>>
> >>>>>>>>> Kind Regards,Luis    On Friday, April 27, 2018, 6:12:03 AM GMT+2,
> >>>>>>>> Guozhang
> >>>>>>>>> Wang <wa...@gmail.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello Luís,
> >>>>>>>>>
> >>>>>>>>>> Offset is an integer? I've only noticed it being resolved as a
> >>> long
> >>>> so
> >>>>>>>>> far.
> >>>>>>>>>
> >>>>>>>>> You are right, offset is a long.
> >>>>>>>>>
> >>>>>>>>> As for timestamp / other types, I left a comment in your PR about
> >>>>>>>> handling
> >>>>>>>>> tie breakers.
> >>>>>>>>>
> >>>>>>>>>> Given these arguments, is this point something that you
> absolutely
> >>>>>> must
> >>>>>>>>> have?
> >>>>>>>>>
> >>>>>>>>> No I do not have a strong use case in mind to go with arbitrary
> >>> byte
> >>>>>>>>> arrays, was just thinking that if we are going to enhance log
> >>>>>> compaction
> >>>>>>>>> why not generalize it more :)
> >>>>>>>>>
> >>>>>>>>> Your concern about the memory usage makes sense. I'm happy to
> take
> >>> my
> >>>>>>>>> suggestion back and enforce only long typed fields.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Apr 26, 2018 at 1:44 AM, Luís Cabral
> >>>>>>>> <luis_cabral@yahoo.com.invalid
> >>>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> bq. have a integer typed OffsetMap (for offset)
> >>>>>>>>>>
> >>>>>>>>>> Offset is an integer? I've only noticed it being resolved as a
> >>> long
> >>>> so
> >>>>>>>>> far.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> bq. long typed OffsetMap (for timestamp)
> >>>>>>>>>>
> >>>>>>>>>> We would still need to store the offset, as it is functioning
> as a
> >>>>>>>>>> tie-breaker. Not that this is a big deal, we can be easily have
> >>> both
> >>>>>>>> (as
> >>>>>>>>>> currently done on the PR).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> bq. For the byte array typed offset map, we can use a general
> >>>> hashmap,
> >>>>>>>>>> where the hashmap's CAPACITY will be reasoned from the given
> "val
> >>>>>>>> memory:
> >>>>>>>>>> Int" parameter
> >>>>>>>>>>
> >>>>>>>>>> If you have a map with 128 byte capacity, then store a value
> with
> >>> 16
> >>>>>>>>> bytes
> >>>>>>>>>> and another with 32 bytes, how many free slots do you have left
> in
> >>>>>> this
> >>>>>>>>> map?
> >>>>>>>>>>
> >>>>>>>>>> You can make this work, but I think you would need to re-design
> >>> the
> >>>>>>>> whole
> >>>>>>>>>> log cleaner approach, which implies changing some of the already
> >>>>>>>> existing
> >>>>>>>>>> configurations (like "log.cleaner.io.buffer.load.factor"). I
> >>> would
> >>>>>>>>> rather
> >>>>>>>>>> maintain backwards compatibility as much as possible in this
> KIP,
> >>>> and
> >>>>>>>> if
> >>>>>>>>>> this means that using "foo" / "bar" or "2.1-a" / "3.20-b" as
> >>> record
> >>>>>>>>>> versions is not viable, then so be it.
> >>>>>>>>>>
> >>>>>>>>>> Given these arguments, is this point something that you
> absolutely
> >>>>>> must
> >>>>>>>>>> have? I'm still sort of hoping that you are just entertaining
> the
> >>>> idea
> >>>>>>>>> and
> >>>>>>>>>> are ok with having a long (now conceded to be unsigned, so the
> >>> byte
> >>>>>>>>> arrays
> >>>>>>>>>> can be compared directly).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Kind Regards,Luis
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
>
>


-- 
-- Guozhang