You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Will Funnell <w....@gmail.com> on 2015/04/14 12:27:22 UTC

Re: Consuming a snapshot from log compacted topic

Hi,

Any update on the above patch?

Hoping you might be able to review it soon.

Thanks.



On 23 February 2015 at 21:21, Will Funnell <w....@gmail.com> wrote:

> Hey guys,
>
> I created a patch based on your feedback.
>
> Let me know what you think.
>
> https://issues.apache.org/jira/browse/KAFKA-1977
>
> On 20 February 2015 at 01:43, Joel Koshy <jj...@gmail.com> wrote:
>
>> The log end offset (of a partition) changes when messages are appended
>> to the partition. (It is not correlated with the consumer's offset).
>>
>>
>> On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote:
>> > So at what point does the log end offset change? When you commit?
>> >
>> > On 19 February 2015 at 18:47, Joel Koshy <jj...@gmail.com> wrote:
>> >
>> > > > If I consumed up to the log end offset and log compaction happens in
>> > > > between, I would have missed some messages.
>> > >
>> > > Compaction actually only runs on the rolled over segments (not the
>> > > active - i.e., latest segment). The log-end-offset will be in the
>> > > latest segment which does not participate in compaction.
>> > >
>> > > > > The log end offset is just the end of the committed messages in
>> the log
>> > > > > (the last thing the consumer has access to). It isn't the same as
>> the
>> > > > > cleaner point but is always later than it so it would work just as
>> > > well.
>> > > >
>> > > > Isn't this just roughly the same value as using c.getOffsetsBefore()
>> > > with a
>> > > > partitionRequestTime of -1?
>> > > >
>> > > >
>> > > > Although its always later than the cleaner point, surely log
>> compaction
>> > > is
>> > > > still an issue here.
>> > > >
>> > > > If I consumed up to the log end offset and log compaction happens in
>> > > > between, I would have missed some messages.
>> > > >
>> > > >
>> > > > My thinking was that if you knew the log cleaner point, you could:
>> > > >
>> > > > Make a note of the starting offset
>> > > > Consume till end of log
>> > > > Check my starting point is ahead of current cleaner point, otherwise
>> > > loop.
>> > > >
>> > > >
>> > > > I appreciate there is a chance I misunderstood your point.
>> > > >
>> > > > On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com>
>> wrote:
>> > > >
>> > > > > The log end offset is just the end of the committed messages in
>> the log
>> > > > > (the last thing the consumer has access to). It isn't the same as
>> the
>> > > > > cleaner point but is always later than it so it would work just as
>> > > well.
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <
>> w.f.funnell@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think
>> it is
>> > > > > > > along the lines of: we expose the log-end-offset (actually
>> the high
>> > > > > > > watermark) of the partition in the fetch response. However,
>> this is
>> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
>> class
>> > > > > > > or the existing MessageAndMetadata class). If we did, then if
>> you
>> > > > > > > were to consume a record you can check that it has offsets up
>> to
>> > > the
>> > > > > > > log-end offset. If it does then you would know for sure that
>> you
>> > > have
>> > > > > > > consumed everything for that partition
>> > > > > >
>> > > > > > To confirm then, the log-end-offset is the same as the cleaner
>> point?
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com>
>> wrote:
>> > > > > >
>> > > > > > > Yeah I was thinking either along the lines Joel was
>> suggesting or
>> > > else
>> > > > > > > adding a logEndOffset(TopicPartition) method or something like
>> > > that. As
>> > > > > > > Joel says the consumer actually has this information
>> internally (we
>> > > > > > return
>> > > > > > > it with the fetch request) but doesn't expose it.
>> > > > > > >
>> > > > > > > -Jay
>> > > > > > >
>> > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <
>> jjkoshy.w@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > > > 2. Make the log end offset available more easily in the
>> > > consumer.
>> > > > > > > > >
>> > > > > > > > > Was thinking something would need to be added in
>> > > LogCleanerManager,
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > updateCheckpoints function. Where would be best to
>> publish the
>> > > > > > > > information
>> > > > > > > > > to make it more easily available, or would you just
>> expose the
>> > > > > > > > > offset-cleaner-checkpoint file as it is?
>> > > > > > > > > Is it right you would also need to know which
>> > > > > > offset-cleaner-checkpoint
>> > > > > > > > > entry related to each active partition?
>> > > > > > > >
>> > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I
>> think it
>> > > is
>> > > > > > > > along the lines of: we expose the log-end-offset (actually
>> the
>> > > high
>> > > > > > > > watermark) of the partition in the fetch response. However,
>> this
>> > > is
>> > > > > > > > not exposed to the consumer (either in the new
>> ConsumerRecord
>> > > class
>> > > > > > > > or the existing MessageAndMetadata class). If we did, then
>> if you
>> > > > > > > > were to consume a record you can check that it has offsets
>> up to
>> > > the
>> > > > > > > > log-end offset. If it does then you would know for sure
>> that you
>> > > have
>> > > > > > > > consumed everything for that partition.
>> > > > > > > >
>> > > > > > > > > Yes, was looking at this initially, but as we have 100-150
>> > > writes
>> > > > > per
>> > > > > > > > > second, it could be a while before there is a pause long
>> > > enough to
>> > > > > > > check
>> > > > > > > > it
>> > > > > > > > > has caught up. Even with the consumer timeout set to -1,
>> it
>> > > takes
>> > > > > > some
>> > > > > > > > time
>> > > > > > > > > to query the max offset values, which is still long
>> enough for
>> > > more
>> > > > > > > > > messages to arrive.
>> > > > > > > >
>> > > > > > > > Got it - thanks for clarifying.
>> > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <
>> jjkoshy.w@gmail.com>
>> > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > > You are also correct and perceptive to notice that if
>> you
>> > > check
>> > > > > > the
>> > > > > > > > end
>> > > > > > > > > > of
>> > > > > > > > > > > the log then begin consuming and read up to that point
>> > > > > compaction
>> > > > > > > may
>> > > > > > > > > > have
>> > > > > > > > > > > already kicked in (if the reading takes a while) and
>> hence
>> > > you
>> > > > > > > might
>> > > > > > > > have
>> > > > > > > > > > > an incomplete snapshot.
>> > > > > > > > > >
>> > > > > > > > > > Isn't it sufficient to just repeat the check at the end
>> after
>> > > > > > reading
>> > > > > > > > > > the log and repeat until you are truly done? At least
>> for the
>> > > > > > > purposes
>> > > > > > > > > > of a snapshot?
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps
>> wrote:
>> > > > > > > > > > > If you catch up off a compacted topic and keep
>> consuming
>> > > then
>> > > > > you
>> > > > > > > > will
>> > > > > > > > > > > become consistent with the log.
>> > > > > > > > > > >
>> > > > > > > > > > > I think what you are saying is that you want to
>> create a
>> > > > > snapshot
>> > > > > > > > from
>> > > > > > > > > > the
>> > > > > > > > > > > Kafka topic but NOT do continual reads after that
>> point.
>> > > For
>> > > > > > > example
>> > > > > > > > you
>> > > > > > > > > > > might be creating a backup of the data to a file.
>> > > > > > > > > > >
>> > > > > > > > > > > I agree that this isn't as easy as it could be. As
>> you say
>> > > the
>> > > > > > only
>> > > > > > > > > > > solution we have is that timeout which doesn't
>> > > differentiate
>> > > > > > > between
>> > > > > > > > GC
>> > > > > > > > > > > stall in your process and no more messages left so you
>> > > would
>> > > > > need
>> > > > > > > to
>> > > > > > > > tune
>> > > > > > > > > > > the timeout. This is admittedly kind of a hack.
>> > > > > > > > > > >
>> > > > > > > > > > > You are also correct and perceptive to notice that if
>> you
>> > > check
>> > > > > > the
>> > > > > > > > end
>> > > > > > > > > > of
>> > > > > > > > > > > the log then begin consuming and read up to that point
>> > > > > compaction
>> > > > > > > may
>> > > > > > > > > > have
>> > > > > > > > > > > already kicked in (if the reading takes a while) and
>> hence
>> > > you
>> > > > > > > might
>> > > > > > > > have
>> > > > > > > > > > > an incomplete snapshot.
>> > > > > > > > > > >
>> > > > > > > > > > > I think there are two features we could add that
>> would make
>> > > > > this
>> > > > > > > > easier:
>> > > > > > > > > > > 1. Make the cleaner point configurable on a per-topic
>> > > basis.
>> > > > > This
>> > > > > > > > feature
>> > > > > > > > > > > would allow you to control how long the full log is
>> > > retained
>> > > > > and
>> > > > > > > when
>> > > > > > > > > > > compaction can kick in. This would give a
>> configurable SLA
>> > > for
>> > > > > > the
>> > > > > > > > reader
>> > > > > > > > > > > process to catch up.
>> > > > > > > > > > > 2. Make the log end offset available more easily in
>> the
>> > > > > consumer.
>> > > > > > > > > > >
>> > > > > > > > > > > -Jay
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
>> > > > > > > > w.f.funnell@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log
>> compaction
>> > > in
>> > > > > > order
>> > > > > > > > to
>> > > > > > > > > > > > provide streams of messages to our clients.
>> > > > > > > > > > > >
>> > > > > > > > > > > > As well as constantly consuming the stream, one of
>> our
>> > > use
>> > > > > > cases
>> > > > > > > > is to
>> > > > > > > > > > > > provide a snapshot, meaning the user will receive a
>> copy
>> > > of
>> > > > > > every
>> > > > > > > > > > message
>> > > > > > > > > > > > at least once.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Each one of these messages represents an item of
>> content
>> > > in
>> > > > > our
>> > > > > > > > system.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > The problem comes when determining if the client has
>> > > actually
>> > > > > > > > reached
>> > > > > > > > > > the
>> > > > > > > > > > > > end of the topic.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The standard Kafka way of dealing with this seems
>> to be
>> > > by
>> > > > > > using
>> > > > > > > a
>> > > > > > > > > > > > ConsumerTimeoutException, but we are frequently
>> getting
>> > > this
>> > > > > > > error
>> > > > > > > > > > when the
>> > > > > > > > > > > > end of the topic has not been reached or even it may
>> > > take a
>> > > > > > long
>> > > > > > > > time
>> > > > > > > > > > > > before a timeout naturally occurs.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On first glance it would seem possible to do a
>> lookup
>> > > for the
>> > > > > > max
>> > > > > > > > > > offset
>> > > > > > > > > > > > for each partition when you begin consuming,
>> stopping
>> > > when
>> > > > > this
>> > > > > > > > > > position it
>> > > > > > > > > > > > reached.
>> > > > > > > > > > > >
>> > > > > > > > > > > > But log compaction means that if an update to a
>> piece of
>> > > > > > content
>> > > > > > > > > > arrives
>> > > > > > > > > > > > with the same message key, then this will be
>> written to
>> > > the
>> > > > > end
>> > > > > > > so
>> > > > > > > > the
>> > > > > > > > > > > > snapshot will be incomplete.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Another thought is to make use of the cleaner point.
>> > > > > Currently
>> > > > > > > > Kafka
>> > > > > > > > > > writes
>> > > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each
>> data
>> > > > > > directory
>> > > > > > > > which
>> > > > > > > > > > is
>> > > > > > > > > > > > written to after log compaction completes.
>> > > > > > > > > > > >
>> > > > > > > > > > > > If the consumer was able to access the
>> > > > > > cleaner-offset-checkpoint
>> > > > > > > > you
>> > > > > > > > > > would
>> > > > > > > > > > > > be able to consume up to this point, check the
>> point was
>> > > > > still
>> > > > > > > the
>> > > > > > > > > > same,
>> > > > > > > > > > > > and compaction had not yet occurred, and therefore
>> > > determine
>> > > > > > you
>> > > > > > > > had
>> > > > > > > > > > > > receive everything at least once. (Assuming there
>> was no
>> > > race
>> > > > > > > > condition
>> > > > > > > > > > > > between compaction and writing to the file)
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Has anybody got any thoughts?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Will
>> > > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > Will Funnell
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Will Funnell
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Will Funnell
>> > >
>> > >
>> >
>> >
>> > --
>> > Will Funnell
>>
>>
>
>
> --
> Will Funnell
>



-- 
Will Funnell