You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Will Funnell <w....@gmail.com> on 2015/02/18 19:18:34 UTC

Consuming a snapshot from log compacted topic

We are currently using Kafka 0.8.1.1 with log compaction in order to
provide streams of messages to our clients.

As well as constantly consuming the stream, one of our use cases is to
provide a snapshot, meaning the user will receive a copy of every message
at least once.

Each one of these messages represents an item of content in our system.


The problem comes when determining if the client has actually reached the
end of the topic.

The standard Kafka way of dealing with this seems to be by using a
ConsumerTimeoutException, but we are frequently getting this error when the
end of the topic has not been reached or even it may take a long time
before a timeout naturally occurs.


On first glance it would seem possible to do a lookup for the max offset
for each partition when you begin consuming, stopping when this position it
reached.

But log compaction means that if an update to a piece of content arrives
with the same message key, then this will be written to the end so the
snapshot will be incomplete.


Another thought is to make use of the cleaner point. Currently Kafka writes
out to a "cleaner-offset-checkpoint" file in each data directory which is
written to after log compaction completes.

If the consumer was able to access the cleaner-offset-checkpoint you would
be able to consume up to this point, check the point was still the same,
and compaction had not yet occurred, and therefore determine you had
receive everything at least once. (Assuming there was no race condition
between compaction and writing to the file)


Has anybody got any thoughts?

Will

Re: Consuming a snapshot from log compacted topic

Posted by svante karlsson <sa...@csi.se>.
Do you have to separate the snapshot from the "normal" update flow.

I've used a compacting kafka topic as the source of truth to a solr
database and fed the topic both with real time updates and "snapshots" from
a hive job. This worked very well. The nice point is that there is a
seamless transition between loading an initial state and after that
continuing with incremental updates. Thus no need to actually tell whether
you are at the end of the stream or not. (Just our normal monitor the "lag"
problem...)



2015-02-18 19:18 GMT+01:00 Will Funnell <w....@gmail.com>:

> We are currently using Kafka 0.8.1.1 with log compaction in order to
> provide streams of messages to our clients.
>
> As well as constantly consuming the stream, one of our use cases is to
> provide a snapshot, meaning the user will receive a copy of every message
> at least once.
>
> Each one of these messages represents an item of content in our system.
>
>
> The problem comes when determining if the client has actually reached the
> end of the topic.
>
> The standard Kafka way of dealing with this seems to be by using a
> ConsumerTimeoutException, but we are frequently getting this error when the
> end of the topic has not been reached or even it may take a long time
> before a timeout naturally occurs.
>
>
> On first glance it would seem possible to do a lookup for the max offset
> for each partition when you begin consuming, stopping when this position it
> reached.
>
> But log compaction means that if an update to a piece of content arrives
> with the same message key, then this will be written to the end so the
> snapshot will be incomplete.
>
>
> Another thought is to make use of the cleaner point. Currently Kafka writes
> out to a "cleaner-offset-checkpoint" file in each data directory which is
> written to after log compaction completes.
>
> If the consumer was able to access the cleaner-offset-checkpoint you would
> be able to consume up to this point, check the point was still the same,
> and compaction had not yet occurred, and therefore determine you had
> receive everything at least once. (Assuming there was no race condition
> between compaction and writing to the file)
>
>
> Has anybody got any thoughts?
>
> Will
>

Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
Hi,

Any update on the above patch?

Hoping you might be able to review it soon.

Thanks.



On 23 February 2015 at 21:21, Will Funnell <w....@gmail.com> wrote:

> Hey guys,
>
> I created a patch based on your feedback.
>
> Let me know what you think.
>
> https://issues.apache.org/jira/browse/KAFKA-1977
>
> On 20 February 2015 at 01:43, Joel Koshy <jj...@gmail.com> wrote:
>
>> The log end offset (of a partition) changes when messages are appended
>> to the partition. (It is not correlated with the consumer's offset).
>>
>>
>> On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote:
>> > So at what point does the log end offset change? When you commit?
>> >
>> > On 19 February 2015 at 18:47, Joel Koshy <jj...@gmail.com> wrote:
>> >
>> > > > If I consumed up to the log end offset and log compaction happens in
>> > > > between, I would have missed some messages.
>> > >
>> > > Compaction actually only runs on the rolled over segments (not the
>> > > active - i.e., latest segment). The log-end-offset will be in the
>> > > latest segment which does not participate in compaction.
>> > >
>> > > > > The log end offset is just the end of the committed messages in
>> the log
>> > > > > (the last thing the consumer has access to). It isn't the same as
>> the
>> > > > > cleaner point but is always later than it so it would work just as
>> > > well.
>> > > >
>> > > > Isn't this just roughly the same value as using c.getOffsetsBefore()
>> > > with a
>> > > > partitionRequestTime of -1?
>> > > >
>> > > >
>> > > > Although its always later than the cleaner point, surely log
>> compaction
>> > > is
>> > > > still an issue here.
>> > > >
>> > > > If I consumed up to the log end offset and log compaction happens in
>> > > > between, I would have missed some messages.
>> > > >
>> > > >
>> > > > My thinking was that if you knew the log cleaner point, you could:
>> > > >
>> > > > Make a note of the starting offset
>> > > > Consume till end of log
>> > > > Check my starting point is ahead of current cleaner point, otherwise
>> > > loop.
>> > > >
>> > > >
>> > > > I appreciate there is a chance I misunderstood your point.
>> > > >
>> > > > On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com>
>> wrote:
>> > > >
>> > > > > The log end offset is just the end of the committed messages in
>> the log
>> > > > > (the last thing the consumer has access to). It isn't the same as
>> the
>> > > > > cleaner point but is always later than it so it would work just as
>> > > well.
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <
>> w.f.funnell@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think
>> it is
>> > > > > > > along the lines of: we expose the log-end-offset (actually
>> the high
>> > > > > > > watermark) of the partition in the fetch response. However,
>> this is
>> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
>> class
>> > > > > > > or the existing MessageAndMetadata class). If we did, then if
>> you
>> > > > > > > were to consume a record you can check that it has offsets up
>> to
>> > > the
>> > > > > > > log-end offset. If it does then you would know for sure that
>> you
>> > > have
>> > > > > > > consumed everything for that partition
>> > > > > >
>> > > > > > To confirm then, the log-end-offset is the same as the cleaner
>> point?
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com>
>> wrote:
>> > > > > >
>> > > > > > > Yeah I was thinking either along the lines Joel was
>> suggesting or
>> > > else
>> > > > > > > adding a logEndOffset(TopicPartition) method or something like
>> > > that. As
>> > > > > > > Joel says the consumer actually has this information
>> internally (we
>> > > > > > return
>> > > > > > > it with the fetch request) but doesn't expose it.
>> > > > > > >
>> > > > > > > -Jay
>> > > > > > >
>> > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <
>> jjkoshy.w@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > > > 2. Make the log end offset available more easily in the
>> > > consumer.
>> > > > > > > > >
>> > > > > > > > > Was thinking something would need to be added in
>> > > LogCleanerManager,
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > updateCheckpoints function. Where would be best to
>> publish the
>> > > > > > > > information
>> > > > > > > > > to make it more easily available, or would you just
>> expose the
>> > > > > > > > > offset-cleaner-checkpoint file as it is?
>> > > > > > > > > Is it right you would also need to know which
>> > > > > > offset-cleaner-checkpoint
>> > > > > > > > > entry related to each active partition?
>> > > > > > > >
>> > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I
>> think it
>> > > is
>> > > > > > > > along the lines of: we expose the log-end-offset (actually
>> the
>> > > high
>> > > > > > > > watermark) of the partition in the fetch response. However,
>> this
>> > > is
>> > > > > > > > not exposed to the consumer (either in the new
>> ConsumerRecord
>> > > class
>> > > > > > > > or the existing MessageAndMetadata class). If we did, then
>> if you
>> > > > > > > > were to consume a record you can check that it has offsets
>> up to
>> > > the
>> > > > > > > > log-end offset. If it does then you would know for sure
>> that you
>> > > have
>> > > > > > > > consumed everything for that partition.
>> > > > > > > >
>> > > > > > > > > Yes, was looking at this initially, but as we have 100-150
>> > > writes
>> > > > > per
>> > > > > > > > > second, it could be a while before there is a pause long
>> > > enough to
>> > > > > > > check
>> > > > > > > > it
>> > > > > > > > > has caught up. Even with the consumer timeout set to -1,
>> it
>> > > takes
>> > > > > > some
>> > > > > > > > time
>> > > > > > > > > to query the max offset values, which is still long
>> enough for
>> > > more
>> > > > > > > > > messages to arrive.
>> > > > > > > >
>> > > > > > > > Got it - thanks for clarifying.
>> > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <
>> jjkoshy.w@gmail.com>
>> > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > > You are also correct and perceptive to notice that if
>> you
>> > > check
>> > > > > > the
>> > > > > > > > end
>> > > > > > > > > > of
>> > > > > > > > > > > the log then begin consuming and read up to that point
>> > > > > compaction
>> > > > > > > may
>> > > > > > > > > > have
>> > > > > > > > > > > already kicked in (if the reading takes a while) and
>> hence
>> > > you
>> > > > > > > might
>> > > > > > > > have
>> > > > > > > > > > > an incomplete snapshot.
>> > > > > > > > > >
>> > > > > > > > > > Isn't it sufficient to just repeat the check at the end
>> after
>> > > > > > reading
>> > > > > > > > > > the log and repeat until you are truly done? At least
>> for the
>> > > > > > > purposes
>> > > > > > > > > > of a snapshot?
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps
>> wrote:
>> > > > > > > > > > > If you catch up off a compacted topic and keep
>> consuming
>> > > then
>> > > > > you
>> > > > > > > > will
>> > > > > > > > > > > become consistent with the log.
>> > > > > > > > > > >
>> > > > > > > > > > > I think what you are saying is that you want to
>> create a
>> > > > > snapshot
>> > > > > > > > from
>> > > > > > > > > > the
>> > > > > > > > > > > Kafka topic but NOT do continual reads after that
>> point.
>> > > For
>> > > > > > > example
>> > > > > > > > you
>> > > > > > > > > > > might be creating a backup of the data to a file.
>> > > > > > > > > > >
>> > > > > > > > > > > I agree that this isn't as easy as it could be. As
>> you say
>> > > the
>> > > > > > only
>> > > > > > > > > > > solution we have is that timeout which doesn't
>> > > differentiate
>> > > > > > > between
>> > > > > > > > GC
>> > > > > > > > > > > stall in your process and no more messages left so you
>> > > would
>> > > > > need
>> > > > > > > to
>> > > > > > > > tune
>> > > > > > > > > > > the timeout. This is admittedly kind of a hack.
>> > > > > > > > > > >
>> > > > > > > > > > > You are also correct and perceptive to notice that if
>> you
>> > > check
>> > > > > > the
>> > > > > > > > end
>> > > > > > > > > > of
>> > > > > > > > > > > the log then begin consuming and read up to that point
>> > > > > compaction
>> > > > > > > may
>> > > > > > > > > > have
>> > > > > > > > > > > already kicked in (if the reading takes a while) and
>> hence
>> > > you
>> > > > > > > might
>> > > > > > > > have
>> > > > > > > > > > > an incomplete snapshot.
>> > > > > > > > > > >
>> > > > > > > > > > > I think there are two features we could add that
>> would make
>> > > > > this
>> > > > > > > > easier:
>> > > > > > > > > > > 1. Make the cleaner point configurable on a per-topic
>> > > basis.
>> > > > > This
>> > > > > > > > feature
>> > > > > > > > > > > would allow you to control how long the full log is
>> > > retained
>> > > > > and
>> > > > > > > when
>> > > > > > > > > > > compaction can kick in. This would give a
>> configurable SLA
>> > > for
>> > > > > > the
>> > > > > > > > reader
>> > > > > > > > > > > process to catch up.
>> > > > > > > > > > > 2. Make the log end offset available more easily in
>> the
>> > > > > consumer.
>> > > > > > > > > > >
>> > > > > > > > > > > -Jay
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
>> > > > > > > > w.f.funnell@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log
>> compaction
>> > > in
>> > > > > > order
>> > > > > > > > to
>> > > > > > > > > > > > provide streams of messages to our clients.
>> > > > > > > > > > > >
>> > > > > > > > > > > > As well as constantly consuming the stream, one of
>> our
>> > > use
>> > > > > > cases
>> > > > > > > > is to
>> > > > > > > > > > > > provide a snapshot, meaning the user will receive a
>> copy
>> > > of
>> > > > > > every
>> > > > > > > > > > message
>> > > > > > > > > > > > at least once.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Each one of these messages represents an item of
>> content
>> > > in
>> > > > > our
>> > > > > > > > system.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > The problem comes when determining if the client has
>> > > actually
>> > > > > > > > reached
>> > > > > > > > > > the
>> > > > > > > > > > > > end of the topic.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The standard Kafka way of dealing with this seems
>> to be
>> > > by
>> > > > > > using
>> > > > > > > a
>> > > > > > > > > > > > ConsumerTimeoutException, but we are frequently
>> getting
>> > > this
>> > > > > > > error
>> > > > > > > > > > when the
>> > > > > > > > > > > > end of the topic has not been reached or even it may
>> > > take a
>> > > > > > long
>> > > > > > > > time
>> > > > > > > > > > > > before a timeout naturally occurs.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On first glance it would seem possible to do a
>> lookup
>> > > for the
>> > > > > > max
>> > > > > > > > > > offset
>> > > > > > > > > > > > for each partition when you begin consuming,
>> stopping
>> > > when
>> > > > > this
>> > > > > > > > > > position it
>> > > > > > > > > > > > reached.
>> > > > > > > > > > > >
>> > > > > > > > > > > > But log compaction means that if an update to a
>> piece of
>> > > > > > content
>> > > > > > > > > > arrives
>> > > > > > > > > > > > with the same message key, then this will be
>> written to
>> > > the
>> > > > > end
>> > > > > > > so
>> > > > > > > > the
>> > > > > > > > > > > > snapshot will be incomplete.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Another thought is to make use of the cleaner point.
>> > > > > Currently
>> > > > > > > > Kafka
>> > > > > > > > > > writes
>> > > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each
>> data
>> > > > > > directory
>> > > > > > > > which
>> > > > > > > > > > is
>> > > > > > > > > > > > written to after log compaction completes.
>> > > > > > > > > > > >
>> > > > > > > > > > > > If the consumer was able to access the
>> > > > > > cleaner-offset-checkpoint
>> > > > > > > > you
>> > > > > > > > > > would
>> > > > > > > > > > > > be able to consume up to this point, check the
>> point was
>> > > > > still
>> > > > > > > the
>> > > > > > > > > > same,
>> > > > > > > > > > > > and compaction had not yet occurred, and therefore
>> > > determine
>> > > > > > you
>> > > > > > > > had
>> > > > > > > > > > > > receive everything at least once. (Assuming there
>> was no
>> > > race
>> > > > > > > > condition
>> > > > > > > > > > > > between compaction and writing to the file)
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Has anybody got any thoughts?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Will
>> > > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > Will Funnell
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Will Funnell
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Will Funnell
>> > >
>> > >
>> >
>> >
>> > --
>> > Will Funnell
>>
>>
>
>
> --
> Will Funnell
>



-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
Hey guys,

I created a patch based on your feedback.

Let me know what you think.

https://issues.apache.org/jira/browse/KAFKA-1977

On 20 February 2015 at 01:43, Joel Koshy <jj...@gmail.com> wrote:

> The log end offset (of a partition) changes when messages are appended
> to the partition. (It is not correlated with the consumer's offset).
>
>
> On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote:
> > So at what point does the log end offset change? When you commit?
> >
> > On 19 February 2015 at 18:47, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > > If I consumed up to the log end offset and log compaction happens in
> > > > between, I would have missed some messages.
> > >
> > > Compaction actually only runs on the rolled over segments (not the
> > > active - i.e., latest segment). The log-end-offset will be in the
> > > latest segment which does not participate in compaction.
> > >
> > > > > The log end offset is just the end of the committed messages in
> the log
> > > > > (the last thing the consumer has access to). It isn't the same as
> the
> > > > > cleaner point but is always later than it so it would work just as
> > > well.
> > > >
> > > > Isn't this just roughly the same value as using c.getOffsetsBefore()
> > > with a
> > > > partitionRequestTime of -1?
> > > >
> > > >
> > > > Although its always later than the cleaner point, surely log
> compaction
> > > is
> > > > still an issue here.
> > > >
> > > > If I consumed up to the log end offset and log compaction happens in
> > > > between, I would have missed some messages.
> > > >
> > > >
> > > > My thinking was that if you knew the log cleaner point, you could:
> > > >
> > > > Make a note of the starting offset
> > > > Consume till end of log
> > > > Check my starting point is ahead of current cleaner point, otherwise
> > > loop.
> > > >
> > > >
> > > > I appreciate there is a chance I misunderstood your point.
> > > >
> > > > On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com> wrote:
> > > >
> > > > > The log end offset is just the end of the committed messages in
> the log
> > > > > (the last thing the consumer has access to). It isn't the same as
> the
> > > > > cleaner point but is always later than it so it would work just as
> > > well.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <
> w.f.funnell@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think
> it is
> > > > > > > along the lines of: we expose the log-end-offset (actually the
> high
> > > > > > > watermark) of the partition in the fetch response. However,
> this is
> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
> class
> > > > > > > or the existing MessageAndMetadata class). If we did, then if
> you
> > > > > > > were to consume a record you can check that it has offsets up
> to
> > > the
> > > > > > > log-end offset. If it does then you would know for sure that
> you
> > > have
> > > > > > > consumed everything for that partition
> > > > > >
> > > > > > To confirm then, the log-end-offset is the same as the cleaner
> point?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com>
> wrote:
> > > > > >
> > > > > > > Yeah I was thinking either along the lines Joel was suggesting
> or
> > > else
> > > > > > > adding a logEndOffset(TopicPartition) method or something like
> > > that. As
> > > > > > > Joel says the consumer actually has this information
> internally (we
> > > > > > return
> > > > > > > it with the fetch request) but doesn't expose it.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <
> jjkoshy.w@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > > > 2. Make the log end offset available more easily in the
> > > consumer.
> > > > > > > > >
> > > > > > > > > Was thinking something would need to be added in
> > > LogCleanerManager,
> > > > > > in
> > > > > > > > the
> > > > > > > > > updateCheckpoints function. Where would be best to publish
> the
> > > > > > > > information
> > > > > > > > > to make it more easily available, or would you just expose
> the
> > > > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > > > Is it right you would also need to know which
> > > > > > offset-cleaner-checkpoint
> > > > > > > > > entry related to each active partition?
> > > > > > > >
> > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I
> think it
> > > is
> > > > > > > > along the lines of: we expose the log-end-offset (actually
> the
> > > high
> > > > > > > > watermark) of the partition in the fetch response. However,
> this
> > > is
> > > > > > > > not exposed to the consumer (either in the new ConsumerRecord
> > > class
> > > > > > > > or the existing MessageAndMetadata class). If we did, then
> if you
> > > > > > > > were to consume a record you can check that it has offsets
> up to
> > > the
> > > > > > > > log-end offset. If it does then you would know for sure that
> you
> > > have
> > > > > > > > consumed everything for that partition.
> > > > > > > >
> > > > > > > > > Yes, was looking at this initially, but as we have 100-150
> > > writes
> > > > > per
> > > > > > > > > second, it could be a while before there is a pause long
> > > enough to
> > > > > > > check
> > > > > > > > it
> > > > > > > > > has caught up. Even with the consumer timeout set to -1, it
> > > takes
> > > > > > some
> > > > > > > > time
> > > > > > > > > to query the max offset values, which is still long enough
> for
> > > more
> > > > > > > > > messages to arrive.
> > > > > > > >
> > > > > > > > Got it - thanks for clarifying.
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <
> jjkoshy.w@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > > You are also correct and perceptive to notice that if
> you
> > > check
> > > > > > the
> > > > > > > > end
> > > > > > > > > > of
> > > > > > > > > > > the log then begin consuming and read up to that point
> > > > > compaction
> > > > > > > may
> > > > > > > > > > have
> > > > > > > > > > > already kicked in (if the reading takes a while) and
> hence
> > > you
> > > > > > > might
> > > > > > > > have
> > > > > > > > > > > an incomplete snapshot.
> > > > > > > > > >
> > > > > > > > > > Isn't it sufficient to just repeat the check at the end
> after
> > > > > > reading
> > > > > > > > > > the log and repeat until you are truly done? At least
> for the
> > > > > > > purposes
> > > > > > > > > > of a snapshot?
> > > > > > > > > >
> > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps
> wrote:
> > > > > > > > > > > If you catch up off a compacted topic and keep
> consuming
> > > then
> > > > > you
> > > > > > > > will
> > > > > > > > > > > become consistent with the log.
> > > > > > > > > > >
> > > > > > > > > > > I think what you are saying is that you want to create
> a
> > > > > snapshot
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > Kafka topic but NOT do continual reads after that
> point.
> > > For
> > > > > > > example
> > > > > > > > you
> > > > > > > > > > > might be creating a backup of the data to a file.
> > > > > > > > > > >
> > > > > > > > > > > I agree that this isn't as easy as it could be. As you
> say
> > > the
> > > > > > only
> > > > > > > > > > > solution we have is that timeout which doesn't
> > > differentiate
> > > > > > > between
> > > > > > > > GC
> > > > > > > > > > > stall in your process and no more messages left so you
> > > would
> > > > > need
> > > > > > > to
> > > > > > > > tune
> > > > > > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > > > > > >
> > > > > > > > > > > You are also correct and perceptive to notice that if
> you
> > > check
> > > > > > the
> > > > > > > > end
> > > > > > > > > > of
> > > > > > > > > > > the log then begin consuming and read up to that point
> > > > > compaction
> > > > > > > may
> > > > > > > > > > have
> > > > > > > > > > > already kicked in (if the reading takes a while) and
> hence
> > > you
> > > > > > > might
> > > > > > > > have
> > > > > > > > > > > an incomplete snapshot.
> > > > > > > > > > >
> > > > > > > > > > > I think there are two features we could add that would
> make
> > > > > this
> > > > > > > > easier:
> > > > > > > > > > > 1. Make the cleaner point configurable on a per-topic
> > > basis.
> > > > > This
> > > > > > > > feature
> > > > > > > > > > > would allow you to control how long the full log is
> > > retained
> > > > > and
> > > > > > > when
> > > > > > > > > > > compaction can kick in. This would give a configurable
> SLA
> > > for
> > > > > > the
> > > > > > > > reader
> > > > > > > > > > > process to catch up.
> > > > > > > > > > > 2. Make the log end offset available more easily in the
> > > > > consumer.
> > > > > > > > > > >
> > > > > > > > > > > -Jay
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > > > > > w.f.funnell@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log
> compaction
> > > in
> > > > > > order
> > > > > > > > to
> > > > > > > > > > > > provide streams of messages to our clients.
> > > > > > > > > > > >
> > > > > > > > > > > > As well as constantly consuming the stream, one of
> our
> > > use
> > > > > > cases
> > > > > > > > is to
> > > > > > > > > > > > provide a snapshot, meaning the user will receive a
> copy
> > > of
> > > > > > every
> > > > > > > > > > message
> > > > > > > > > > > > at least once.
> > > > > > > > > > > >
> > > > > > > > > > > > Each one of these messages represents an item of
> content
> > > in
> > > > > our
> > > > > > > > system.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > The problem comes when determining if the client has
> > > actually
> > > > > > > > reached
> > > > > > > > > > the
> > > > > > > > > > > > end of the topic.
> > > > > > > > > > > >
> > > > > > > > > > > > The standard Kafka way of dealing with this seems to
> be
> > > by
> > > > > > using
> > > > > > > a
> > > > > > > > > > > > ConsumerTimeoutException, but we are frequently
> getting
> > > this
> > > > > > > error
> > > > > > > > > > when the
> > > > > > > > > > > > end of the topic has not been reached or even it may
> > > take a
> > > > > > long
> > > > > > > > time
> > > > > > > > > > > > before a timeout naturally occurs.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On first glance it would seem possible to do a lookup
> > > for the
> > > > > > max
> > > > > > > > > > offset
> > > > > > > > > > > > for each partition when you begin consuming, stopping
> > > when
> > > > > this
> > > > > > > > > > position it
> > > > > > > > > > > > reached.
> > > > > > > > > > > >
> > > > > > > > > > > > But log compaction means that if an update to a
> piece of
> > > > > > content
> > > > > > > > > > arrives
> > > > > > > > > > > > with the same message key, then this will be written
> to
> > > the
> > > > > end
> > > > > > > so
> > > > > > > > the
> > > > > > > > > > > > snapshot will be incomplete.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Another thought is to make use of the cleaner point.
> > > > > Currently
> > > > > > > > Kafka
> > > > > > > > > > writes
> > > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each
> data
> > > > > > directory
> > > > > > > > which
> > > > > > > > > > is
> > > > > > > > > > > > written to after log compaction completes.
> > > > > > > > > > > >
> > > > > > > > > > > > If the consumer was able to access the
> > > > > > cleaner-offset-checkpoint
> > > > > > > > you
> > > > > > > > > > would
> > > > > > > > > > > > be able to consume up to this point, check the point
> was
> > > > > still
> > > > > > > the
> > > > > > > > > > same,
> > > > > > > > > > > > and compaction had not yet occurred, and therefore
> > > determine
> > > > > > you
> > > > > > > > had
> > > > > > > > > > > > receive everything at least once. (Assuming there
> was no
> > > race
> > > > > > > > condition
> > > > > > > > > > > > between compaction and writing to the file)
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Has anybody got any thoughts?
> > > > > > > > > > > >
> > > > > > > > > > > > Will
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Will Funnell
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Will Funnell
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Will Funnell
> > >
> > >
> >
> >
> > --
> > Will Funnell
>
>


-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Joel Koshy <jj...@gmail.com>.
The log end offset (of a partition) changes when messages are appended
to the partition. (It is not correlated with the consumer's offset).


On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote:
> So at what point does the log end offset change? When you commit?
> 
> On 19 February 2015 at 18:47, Joel Koshy <jj...@gmail.com> wrote:
> 
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> >
> > Compaction actually only runs on the rolled over segments (not the
> > active - i.e., latest segment). The log-end-offset will be in the
> > latest segment which does not participate in compaction.
> >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > >
> > > Isn't this just roughly the same value as using c.getOffsetsBefore()
> > with a
> > > partitionRequestTime of -1?
> > >
> > >
> > > Although its always later than the cleaner point, surely log compaction
> > is
> > > still an issue here.
> > >
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> > >
> > >
> > > My thinking was that if you knew the log cleaner point, you could:
> > >
> > > Make a note of the starting offset
> > > Consume till end of log
> > > Check my starting point is ahead of current cleaner point, otherwise
> > loop.
> > >
> > >
> > > I appreciate there is a chance I misunderstood your point.
> > >
> > > On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w....@gmail.com>
> > > > wrote:
> > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > > watermark) of the partition in the fetch response. However, this is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > consumed everything for that partition
> > > > >
> > > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > > >
> > > > >
> > > > >
> > > > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:
> > > > >
> > > > > > Yeah I was thinking either along the lines Joel was suggesting or
> > else
> > > > > > adding a logEndOffset(TopicPartition) method or something like
> > that. As
> > > > > > Joel says the consumer actually has this information internally (we
> > > > > return
> > > > > > it with the fetch request) but doesn't expose it.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > > > 2. Make the log end offset available more easily in the
> > consumer.
> > > > > > > >
> > > > > > > > Was thinking something would need to be added in
> > LogCleanerManager,
> > > > > in
> > > > > > > the
> > > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > > information
> > > > > > > > to make it more easily available, or would you just expose the
> > > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > > Is it right you would also need to know which
> > > > > offset-cleaner-checkpoint
> > > > > > > > entry related to each active partition?
> > > > > > >
> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it
> > is
> > > > > > > along the lines of: we expose the log-end-offset (actually the
> > high
> > > > > > > watermark) of the partition in the fetch response. However, this
> > is
> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
> > class
> > > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > > consumed everything for that partition.
> > > > > > >
> > > > > > > > Yes, was looking at this initially, but as we have 100-150
> > writes
> > > > per
> > > > > > > > second, it could be a while before there is a pause long
> > enough to
> > > > > > check
> > > > > > > it
> > > > > > > > has caught up. Even with the consumer timeout set to -1, it
> > takes
> > > > > some
> > > > > > > time
> > > > > > > > to query the max offset values, which is still long enough for
> > more
> > > > > > > > messages to arrive.
> > > > > > >
> > > > > > > Got it - thanks for clarifying.
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > > You are also correct and perceptive to notice that if you
> > check
> > > > > the
> > > > > > > end
> > > > > > > > > of
> > > > > > > > > > the log then begin consuming and read up to that point
> > > > compaction
> > > > > > may
> > > > > > > > > have
> > > > > > > > > > already kicked in (if the reading takes a while) and hence
> > you
> > > > > > might
> > > > > > > have
> > > > > > > > > > an incomplete snapshot.
> > > > > > > > >
> > > > > > > > > Isn't it sufficient to just repeat the check at the end after
> > > > > reading
> > > > > > > > > the log and repeat until you are truly done? At least for the
> > > > > > purposes
> > > > > > > > > of a snapshot?
> > > > > > > > >
> > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > > > > If you catch up off a compacted topic and keep consuming
> > then
> > > > you
> > > > > > > will
> > > > > > > > > > become consistent with the log.
> > > > > > > > > >
> > > > > > > > > > I think what you are saying is that you want to create a
> > > > snapshot
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > Kafka topic but NOT do continual reads after that point.
> > For
> > > > > > example
> > > > > > > you
> > > > > > > > > > might be creating a backup of the data to a file.
> > > > > > > > > >
> > > > > > > > > > I agree that this isn't as easy as it could be. As you say
> > the
> > > > > only
> > > > > > > > > > solution we have is that timeout which doesn't
> > differentiate
> > > > > > between
> > > > > > > GC
> > > > > > > > > > stall in your process and no more messages left so you
> > would
> > > > need
> > > > > > to
> > > > > > > tune
> > > > > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > > > > >
> > > > > > > > > > You are also correct and perceptive to notice that if you
> > check
> > > > > the
> > > > > > > end
> > > > > > > > > of
> > > > > > > > > > the log then begin consuming and read up to that point
> > > > compaction
> > > > > > may
> > > > > > > > > have
> > > > > > > > > > already kicked in (if the reading takes a while) and hence
> > you
> > > > > > might
> > > > > > > have
> > > > > > > > > > an incomplete snapshot.
> > > > > > > > > >
> > > > > > > > > > I think there are two features we could add that would make
> > > > this
> > > > > > > easier:
> > > > > > > > > > 1. Make the cleaner point configurable on a per-topic
> > basis.
> > > > This
> > > > > > > feature
> > > > > > > > > > would allow you to control how long the full log is
> > retained
> > > > and
> > > > > > when
> > > > > > > > > > compaction can kick in. This would give a configurable SLA
> > for
> > > > > the
> > > > > > > reader
> > > > > > > > > > process to catch up.
> > > > > > > > > > 2. Make the log end offset available more easily in the
> > > > consumer.
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > > > > w.f.funnell@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction
> > in
> > > > > order
> > > > > > > to
> > > > > > > > > > > provide streams of messages to our clients.
> > > > > > > > > > >
> > > > > > > > > > > As well as constantly consuming the stream, one of our
> > use
> > > > > cases
> > > > > > > is to
> > > > > > > > > > > provide a snapshot, meaning the user will receive a copy
> > of
> > > > > every
> > > > > > > > > message
> > > > > > > > > > > at least once.
> > > > > > > > > > >
> > > > > > > > > > > Each one of these messages represents an item of content
> > in
> > > > our
> > > > > > > system.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > The problem comes when determining if the client has
> > actually
> > > > > > > reached
> > > > > > > > > the
> > > > > > > > > > > end of the topic.
> > > > > > > > > > >
> > > > > > > > > > > The standard Kafka way of dealing with this seems to be
> > by
> > > > > using
> > > > > > a
> > > > > > > > > > > ConsumerTimeoutException, but we are frequently getting
> > this
> > > > > > error
> > > > > > > > > when the
> > > > > > > > > > > end of the topic has not been reached or even it may
> > take a
> > > > > long
> > > > > > > time
> > > > > > > > > > > before a timeout naturally occurs.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On first glance it would seem possible to do a lookup
> > for the
> > > > > max
> > > > > > > > > offset
> > > > > > > > > > > for each partition when you begin consuming, stopping
> > when
> > > > this
> > > > > > > > > position it
> > > > > > > > > > > reached.
> > > > > > > > > > >
> > > > > > > > > > > But log compaction means that if an update to a piece of
> > > > > content
> > > > > > > > > arrives
> > > > > > > > > > > with the same message key, then this will be written to
> > the
> > > > end
> > > > > > so
> > > > > > > the
> > > > > > > > > > > snapshot will be incomplete.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Another thought is to make use of the cleaner point.
> > > > Currently
> > > > > > > Kafka
> > > > > > > > > writes
> > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> > > > > directory
> > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > written to after log compaction completes.
> > > > > > > > > > >
> > > > > > > > > > > If the consumer was able to access the
> > > > > cleaner-offset-checkpoint
> > > > > > > you
> > > > > > > > > would
> > > > > > > > > > > be able to consume up to this point, check the point was
> > > > still
> > > > > > the
> > > > > > > > > same,
> > > > > > > > > > > and compaction had not yet occurred, and therefore
> > determine
> > > > > you
> > > > > > > had
> > > > > > > > > > > receive everything at least once. (Assuming there was no
> > race
> > > > > > > condition
> > > > > > > > > > > between compaction and writing to the file)
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Has anybody got any thoughts?
> > > > > > > > > > >
> > > > > > > > > > > Will
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Will Funnell
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Will Funnell
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Will Funnell
> >
> >
> 
> 
> -- 
> Will Funnell


Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
So at what point does the log end offset change? When you commit?

On 19 February 2015 at 18:47, Joel Koshy <jj...@gmail.com> wrote:

> > If I consumed up to the log end offset and log compaction happens in
> > between, I would have missed some messages.
>
> Compaction actually only runs on the rolled over segments (not the
> active - i.e., latest segment). The log-end-offset will be in the
> latest segment which does not participate in compaction.
>
> > > The log end offset is just the end of the committed messages in the log
> > > (the last thing the consumer has access to). It isn't the same as the
> > > cleaner point but is always later than it so it would work just as
> well.
> >
> > Isn't this just roughly the same value as using c.getOffsetsBefore()
> with a
> > partitionRequestTime of -1?
> >
> >
> > Although its always later than the cleaner point, surely log compaction
> is
> > still an issue here.
> >
> > If I consumed up to the log end offset and log compaction happens in
> > between, I would have missed some messages.
> >
> >
> > My thinking was that if you knew the log cleaner point, you could:
> >
> > Make a note of the starting offset
> > Consume till end of log
> > Check my starting point is ahead of current cleaner point, otherwise
> loop.
> >
> >
> > I appreciate there is a chance I misunderstood your point.
> >
> > On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > The log end offset is just the end of the committed messages in the log
> > > (the last thing the consumer has access to). It isn't the same as the
> > > cleaner point but is always later than it so it would work just as
> well.
> > >
> > > -Jay
> > >
> > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w....@gmail.com>
> > > wrote:
> > >
> > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > watermark) of the partition in the fetch response. However, this is
> > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > were to consume a record you can check that it has offsets up to
> the
> > > > > log-end offset. If it does then you would know for sure that you
> have
> > > > > consumed everything for that partition
> > > >
> > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > >
> > > >
> > > >
> > > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:
> > > >
> > > > > Yeah I was thinking either along the lines Joel was suggesting or
> else
> > > > > adding a logEndOffset(TopicPartition) method or something like
> that. As
> > > > > Joel says the consumer actually has this information internally (we
> > > > return
> > > > > it with the fetch request) but doesn't expose it.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com>
> > > wrote:
> > > > >
> > > > > > > > 2. Make the log end offset available more easily in the
> consumer.
> > > > > > >
> > > > > > > Was thinking something would need to be added in
> LogCleanerManager,
> > > > in
> > > > > > the
> > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > information
> > > > > > > to make it more easily available, or would you just expose the
> > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > Is it right you would also need to know which
> > > > offset-cleaner-checkpoint
> > > > > > > entry related to each active partition?
> > > > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it
> is
> > > > > > along the lines of: we expose the log-end-offset (actually the
> high
> > > > > > watermark) of the partition in the fetch response. However, this
> is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord
> class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> the
> > > > > > log-end offset. If it does then you would know for sure that you
> have
> > > > > > consumed everything for that partition.
> > > > > >
> > > > > > > Yes, was looking at this initially, but as we have 100-150
> writes
> > > per
> > > > > > > second, it could be a while before there is a pause long
> enough to
> > > > > check
> > > > > > it
> > > > > > > has caught up. Even with the consumer timeout set to -1, it
> takes
> > > > some
> > > > > > time
> > > > > > > to query the max offset values, which is still long enough for
> more
> > > > > > > messages to arrive.
> > > > > >
> > > > > > Got it - thanks for clarifying.
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > > You are also correct and perceptive to notice that if you
> check
> > > > the
> > > > > > end
> > > > > > > > of
> > > > > > > > > the log then begin consuming and read up to that point
> > > compaction
> > > > > may
> > > > > > > > have
> > > > > > > > > already kicked in (if the reading takes a while) and hence
> you
> > > > > might
> > > > > > have
> > > > > > > > > an incomplete snapshot.
> > > > > > > >
> > > > > > > > Isn't it sufficient to just repeat the check at the end after
> > > > reading
> > > > > > > > the log and repeat until you are truly done? At least for the
> > > > > purposes
> > > > > > > > of a snapshot?
> > > > > > > >
> > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > > > If you catch up off a compacted topic and keep consuming
> then
> > > you
> > > > > > will
> > > > > > > > > become consistent with the log.
> > > > > > > > >
> > > > > > > > > I think what you are saying is that you want to create a
> > > snapshot
> > > > > > from
> > > > > > > > the
> > > > > > > > > Kafka topic but NOT do continual reads after that point.
> For
> > > > > example
> > > > > > you
> > > > > > > > > might be creating a backup of the data to a file.
> > > > > > > > >
> > > > > > > > > I agree that this isn't as easy as it could be. As you say
> the
> > > > only
> > > > > > > > > solution we have is that timeout which doesn't
> differentiate
> > > > > between
> > > > > > GC
> > > > > > > > > stall in your process and no more messages left so you
> would
> > > need
> > > > > to
> > > > > > tune
> > > > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > > > >
> > > > > > > > > You are also correct and perceptive to notice that if you
> check
> > > > the
> > > > > > end
> > > > > > > > of
> > > > > > > > > the log then begin consuming and read up to that point
> > > compaction
> > > > > may
> > > > > > > > have
> > > > > > > > > already kicked in (if the reading takes a while) and hence
> you
> > > > > might
> > > > > > have
> > > > > > > > > an incomplete snapshot.
> > > > > > > > >
> > > > > > > > > I think there are two features we could add that would make
> > > this
> > > > > > easier:
> > > > > > > > > 1. Make the cleaner point configurable on a per-topic
> basis.
> > > This
> > > > > > feature
> > > > > > > > > would allow you to control how long the full log is
> retained
> > > and
> > > > > when
> > > > > > > > > compaction can kick in. This would give a configurable SLA
> for
> > > > the
> > > > > > reader
> > > > > > > > > process to catch up.
> > > > > > > > > 2. Make the log end offset available more easily in the
> > > consumer.
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > > > w.f.funnell@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction
> in
> > > > order
> > > > > > to
> > > > > > > > > > provide streams of messages to our clients.
> > > > > > > > > >
> > > > > > > > > > As well as constantly consuming the stream, one of our
> use
> > > > cases
> > > > > > is to
> > > > > > > > > > provide a snapshot, meaning the user will receive a copy
> of
> > > > every
> > > > > > > > message
> > > > > > > > > > at least once.
> > > > > > > > > >
> > > > > > > > > > Each one of these messages represents an item of content
> in
> > > our
> > > > > > system.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > The problem comes when determining if the client has
> actually
> > > > > > reached
> > > > > > > > the
> > > > > > > > > > end of the topic.
> > > > > > > > > >
> > > > > > > > > > The standard Kafka way of dealing with this seems to be
> by
> > > > using
> > > > > a
> > > > > > > > > > ConsumerTimeoutException, but we are frequently getting
> this
> > > > > error
> > > > > > > > when the
> > > > > > > > > > end of the topic has not been reached or even it may
> take a
> > > > long
> > > > > > time
> > > > > > > > > > before a timeout naturally occurs.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On first glance it would seem possible to do a lookup
> for the
> > > > max
> > > > > > > > offset
> > > > > > > > > > for each partition when you begin consuming, stopping
> when
> > > this
> > > > > > > > position it
> > > > > > > > > > reached.
> > > > > > > > > >
> > > > > > > > > > But log compaction means that if an update to a piece of
> > > > content
> > > > > > > > arrives
> > > > > > > > > > with the same message key, then this will be written to
> the
> > > end
> > > > > so
> > > > > > the
> > > > > > > > > > snapshot will be incomplete.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Another thought is to make use of the cleaner point.
> > > Currently
> > > > > > Kafka
> > > > > > > > writes
> > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> > > > directory
> > > > > > which
> > > > > > > > is
> > > > > > > > > > written to after log compaction completes.
> > > > > > > > > >
> > > > > > > > > > If the consumer was able to access the
> > > > cleaner-offset-checkpoint
> > > > > > you
> > > > > > > > would
> > > > > > > > > > be able to consume up to this point, check the point was
> > > still
> > > > > the
> > > > > > > > same,
> > > > > > > > > > and compaction had not yet occurred, and therefore
> determine
> > > > you
> > > > > > had
> > > > > > > > > > receive everything at least once. (Assuming there was no
> race
> > > > > > condition
> > > > > > > > > > between compaction and writing to the file)
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Has anybody got any thoughts?
> > > > > > > > > >
> > > > > > > > > > Will
> > > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Will Funnell
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Will Funnell
> > > >
> > >
> >
> >
> >
> > --
> > Will Funnell
>
>


-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Joel Koshy <jj...@gmail.com>.
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.

Compaction actually only runs on the rolled over segments (not the
active - i.e., latest segment). The log-end-offset will be in the
latest segment which does not participate in compaction.

> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> 
> Isn't this just roughly the same value as using c.getOffsetsBefore() with a
> partitionRequestTime of -1?
> 
> 
> Although its always later than the cleaner point, surely log compaction is
> still an issue here.
> 
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.
> 
> 
> My thinking was that if you knew the log cleaner point, you could:
> 
> Make a note of the starting offset
> Consume till end of log
> Check my starting point is ahead of current cleaner point, otherwise loop.
> 
> 
> I appreciate there is a chance I misunderstood your point.
> 
> On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com> wrote:
> 
> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> >
> > -Jay
> >
> > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w....@gmail.com>
> > wrote:
> >
> > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > along the lines of: we expose the log-end-offset (actually the high
> > > > watermark) of the partition in the fetch response. However, this is
> > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > were to consume a record you can check that it has offsets up to the
> > > > log-end offset. If it does then you would know for sure that you have
> > > > consumed everything for that partition
> > >
> > > To confirm then, the log-end-offset is the same as the cleaner point?
> > >
> > >
> > >
> > > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > > > Yeah I was thinking either along the lines Joel was suggesting or else
> > > > adding a logEndOffset(TopicPartition) method or something like that. As
> > > > Joel says the consumer actually has this information internally (we
> > > return
> > > > it with the fetch request) but doesn't expose it.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com>
> > wrote:
> > > >
> > > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > > >
> > > > > > Was thinking something would need to be added in LogCleanerManager,
> > > in
> > > > > the
> > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > information
> > > > > > to make it more easily available, or would you just expose the
> > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > Is it right you would also need to know which
> > > offset-cleaner-checkpoint
> > > > > > entry related to each active partition?
> > > > >
> > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > watermark) of the partition in the fetch response. However, this is
> > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > were to consume a record you can check that it has offsets up to the
> > > > > log-end offset. If it does then you would know for sure that you have
> > > > > consumed everything for that partition.
> > > > >
> > > > > > Yes, was looking at this initially, but as we have 100-150 writes
> > per
> > > > > > second, it could be a while before there is a pause long enough to
> > > > check
> > > > > it
> > > > > > has caught up. Even with the consumer timeout set to -1, it takes
> > > some
> > > > > time
> > > > > > to query the max offset values, which is still long enough for more
> > > > > > messages to arrive.
> > > > >
> > > > > Got it - thanks for clarifying.
> > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > > You are also correct and perceptive to notice that if you check
> > > the
> > > > > end
> > > > > > > of
> > > > > > > > the log then begin consuming and read up to that point
> > compaction
> > > > may
> > > > > > > have
> > > > > > > > already kicked in (if the reading takes a while) and hence you
> > > > might
> > > > > have
> > > > > > > > an incomplete snapshot.
> > > > > > >
> > > > > > > Isn't it sufficient to just repeat the check at the end after
> > > reading
> > > > > > > the log and repeat until you are truly done? At least for the
> > > > purposes
> > > > > > > of a snapshot?
> > > > > > >
> > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > > If you catch up off a compacted topic and keep consuming then
> > you
> > > > > will
> > > > > > > > become consistent with the log.
> > > > > > > >
> > > > > > > > I think what you are saying is that you want to create a
> > snapshot
> > > > > from
> > > > > > > the
> > > > > > > > Kafka topic but NOT do continual reads after that point. For
> > > > example
> > > > > you
> > > > > > > > might be creating a backup of the data to a file.
> > > > > > > >
> > > > > > > > I agree that this isn't as easy as it could be. As you say the
> > > only
> > > > > > > > solution we have is that timeout which doesn't differentiate
> > > > between
> > > > > GC
> > > > > > > > stall in your process and no more messages left so you would
> > need
> > > > to
> > > > > tune
> > > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > > >
> > > > > > > > You are also correct and perceptive to notice that if you check
> > > the
> > > > > end
> > > > > > > of
> > > > > > > > the log then begin consuming and read up to that point
> > compaction
> > > > may
> > > > > > > have
> > > > > > > > already kicked in (if the reading takes a while) and hence you
> > > > might
> > > > > have
> > > > > > > > an incomplete snapshot.
> > > > > > > >
> > > > > > > > I think there are two features we could add that would make
> > this
> > > > > easier:
> > > > > > > > 1. Make the cleaner point configurable on a per-topic basis.
> > This
> > > > > feature
> > > > > > > > would allow you to control how long the full log is retained
> > and
> > > > when
> > > > > > > > compaction can kick in. This would give a configurable SLA for
> > > the
> > > > > reader
> > > > > > > > process to catch up.
> > > > > > > > 2. Make the log end offset available more easily in the
> > consumer.
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > > w.f.funnell@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in
> > > order
> > > > > to
> > > > > > > > > provide streams of messages to our clients.
> > > > > > > > >
> > > > > > > > > As well as constantly consuming the stream, one of our use
> > > cases
> > > > > is to
> > > > > > > > > provide a snapshot, meaning the user will receive a copy of
> > > every
> > > > > > > message
> > > > > > > > > at least once.
> > > > > > > > >
> > > > > > > > > Each one of these messages represents an item of content in
> > our
> > > > > system.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > The problem comes when determining if the client has actually
> > > > > reached
> > > > > > > the
> > > > > > > > > end of the topic.
> > > > > > > > >
> > > > > > > > > The standard Kafka way of dealing with this seems to be by
> > > using
> > > > a
> > > > > > > > > ConsumerTimeoutException, but we are frequently getting this
> > > > error
> > > > > > > when the
> > > > > > > > > end of the topic has not been reached or even it may take a
> > > long
> > > > > time
> > > > > > > > > before a timeout naturally occurs.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On first glance it would seem possible to do a lookup for the
> > > max
> > > > > > > offset
> > > > > > > > > for each partition when you begin consuming, stopping when
> > this
> > > > > > > position it
> > > > > > > > > reached.
> > > > > > > > >
> > > > > > > > > But log compaction means that if an update to a piece of
> > > content
> > > > > > > arrives
> > > > > > > > > with the same message key, then this will be written to the
> > end
> > > > so
> > > > > the
> > > > > > > > > snapshot will be incomplete.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Another thought is to make use of the cleaner point.
> > Currently
> > > > > Kafka
> > > > > > > writes
> > > > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> > > directory
> > > > > which
> > > > > > > is
> > > > > > > > > written to after log compaction completes.
> > > > > > > > >
> > > > > > > > > If the consumer was able to access the
> > > cleaner-offset-checkpoint
> > > > > you
> > > > > > > would
> > > > > > > > > be able to consume up to this point, check the point was
> > still
> > > > the
> > > > > > > same,
> > > > > > > > > and compaction had not yet occurred, and therefore determine
> > > you
> > > > > had
> > > > > > > > > receive everything at least once. (Assuming there was no race
> > > > > condition
> > > > > > > > > between compaction and writing to the file)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Has anybody got any thoughts?
> > > > > > > > >
> > > > > > > > > Will
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Will Funnell
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Will Funnell
> > >
> >
> 
> 
> 
> -- 
> Will Funnell


Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
> The log end offset is just the end of the committed messages in the log
> (the last thing the consumer has access to). It isn't the same as the
> cleaner point but is always later than it so it would work just as well.

Isn't this just roughly the same value as using c.getOffsetsBefore() with a
partitionRequestTime of -1?


Although its always later than the cleaner point, surely log compaction is
still an issue here.

If I consumed up to the log end offset and log compaction happens in
between, I would have missed some messages.


My thinking was that if you knew the log cleaner point, you could:

Make a note of the starting offset
Consume till end of log
Check my starting point is ahead of current cleaner point, otherwise loop.


I appreciate there is a chance I misunderstood your point.

On 19 February 2015 at 18:02, Jay Kreps <ja...@gmail.com> wrote:

> The log end offset is just the end of the committed messages in the log
> (the last thing the consumer has access to). It isn't the same as the
> cleaner point but is always later than it so it would work just as well.
>
> -Jay
>
> On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w....@gmail.com>
> wrote:
>
> > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > along the lines of: we expose the log-end-offset (actually the high
> > > watermark) of the partition in the fetch response. However, this is
> > > not exposed to the consumer (either in the new ConsumerRecord class
> > > or the existing MessageAndMetadata class). If we did, then if you
> > > were to consume a record you can check that it has offsets up to the
> > > log-end offset. If it does then you would know for sure that you have
> > > consumed everything for that partition
> >
> > To confirm then, the log-end-offset is the same as the cleaner point?
> >
> >
> >
> > On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Yeah I was thinking either along the lines Joel was suggesting or else
> > > adding a logEndOffset(TopicPartition) method or something like that. As
> > > Joel says the consumer actually has this information internally (we
> > return
> > > it with the fetch request) but doesn't expose it.
> > >
> > > -Jay
> > >
> > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com>
> wrote:
> > >
> > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > >
> > > > > Was thinking something would need to be added in LogCleanerManager,
> > in
> > > > the
> > > > > updateCheckpoints function. Where would be best to publish the
> > > > information
> > > > > to make it more easily available, or would you just expose the
> > > > > offset-cleaner-checkpoint file as it is?
> > > > > Is it right you would also need to know which
> > offset-cleaner-checkpoint
> > > > > entry related to each active partition?
> > > >
> > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > along the lines of: we expose the log-end-offset (actually the high
> > > > watermark) of the partition in the fetch response. However, this is
> > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > were to consume a record you can check that it has offsets up to the
> > > > log-end offset. If it does then you would know for sure that you have
> > > > consumed everything for that partition.
> > > >
> > > > > Yes, was looking at this initially, but as we have 100-150 writes
> per
> > > > > second, it could be a while before there is a pause long enough to
> > > check
> > > > it
> > > > > has caught up. Even with the consumer timeout set to -1, it takes
> > some
> > > > time
> > > > > to query the max offset values, which is still long enough for more
> > > > > messages to arrive.
> > > >
> > > > Got it - thanks for clarifying.
> > > >
> > > > >
> > > > >
> > > > >
> > > > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com>
> > wrote:
> > > > >
> > > > > > > You are also correct and perceptive to notice that if you check
> > the
> > > > end
> > > > > > of
> > > > > > > the log then begin consuming and read up to that point
> compaction
> > > may
> > > > > > have
> > > > > > > already kicked in (if the reading takes a while) and hence you
> > > might
> > > > have
> > > > > > > an incomplete snapshot.
> > > > > >
> > > > > > Isn't it sufficient to just repeat the check at the end after
> > reading
> > > > > > the log and repeat until you are truly done? At least for the
> > > purposes
> > > > > > of a snapshot?
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > If you catch up off a compacted topic and keep consuming then
> you
> > > > will
> > > > > > > become consistent with the log.
> > > > > > >
> > > > > > > I think what you are saying is that you want to create a
> snapshot
> > > > from
> > > > > > the
> > > > > > > Kafka topic but NOT do continual reads after that point. For
> > > example
> > > > you
> > > > > > > might be creating a backup of the data to a file.
> > > > > > >
> > > > > > > I agree that this isn't as easy as it could be. As you say the
> > only
> > > > > > > solution we have is that timeout which doesn't differentiate
> > > between
> > > > GC
> > > > > > > stall in your process and no more messages left so you would
> need
> > > to
> > > > tune
> > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > >
> > > > > > > You are also correct and perceptive to notice that if you check
> > the
> > > > end
> > > > > > of
> > > > > > > the log then begin consuming and read up to that point
> compaction
> > > may
> > > > > > have
> > > > > > > already kicked in (if the reading takes a while) and hence you
> > > might
> > > > have
> > > > > > > an incomplete snapshot.
> > > > > > >
> > > > > > > I think there are two features we could add that would make
> this
> > > > easier:
> > > > > > > 1. Make the cleaner point configurable on a per-topic basis.
> This
> > > > feature
> > > > > > > would allow you to control how long the full log is retained
> and
> > > when
> > > > > > > compaction can kick in. This would give a configurable SLA for
> > the
> > > > reader
> > > > > > > process to catch up.
> > > > > > > 2. Make the log end offset available more easily in the
> consumer.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > w.f.funnell@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in
> > order
> > > > to
> > > > > > > > provide streams of messages to our clients.
> > > > > > > >
> > > > > > > > As well as constantly consuming the stream, one of our use
> > cases
> > > > is to
> > > > > > > > provide a snapshot, meaning the user will receive a copy of
> > every
> > > > > > message
> > > > > > > > at least once.
> > > > > > > >
> > > > > > > > Each one of these messages represents an item of content in
> our
> > > > system.
> > > > > > > >
> > > > > > > >
> > > > > > > > The problem comes when determining if the client has actually
> > > > reached
> > > > > > the
> > > > > > > > end of the topic.
> > > > > > > >
> > > > > > > > The standard Kafka way of dealing with this seems to be by
> > using
> > > a
> > > > > > > > ConsumerTimeoutException, but we are frequently getting this
> > > error
> > > > > > when the
> > > > > > > > end of the topic has not been reached or even it may take a
> > long
> > > > time
> > > > > > > > before a timeout naturally occurs.
> > > > > > > >
> > > > > > > >
> > > > > > > > On first glance it would seem possible to do a lookup for the
> > max
> > > > > > offset
> > > > > > > > for each partition when you begin consuming, stopping when
> this
> > > > > > position it
> > > > > > > > reached.
> > > > > > > >
> > > > > > > > But log compaction means that if an update to a piece of
> > content
> > > > > > arrives
> > > > > > > > with the same message key, then this will be written to the
> end
> > > so
> > > > the
> > > > > > > > snapshot will be incomplete.
> > > > > > > >
> > > > > > > >
> > > > > > > > Another thought is to make use of the cleaner point.
> Currently
> > > > Kafka
> > > > > > writes
> > > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> > directory
> > > > which
> > > > > > is
> > > > > > > > written to after log compaction completes.
> > > > > > > >
> > > > > > > > If the consumer was able to access the
> > cleaner-offset-checkpoint
> > > > you
> > > > > > would
> > > > > > > > be able to consume up to this point, check the point was
> still
> > > the
> > > > > > same,
> > > > > > > > and compaction had not yet occurred, and therefore determine
> > you
> > > > had
> > > > > > > > receive everything at least once. (Assuming there was no race
> > > > condition
> > > > > > > > between compaction and writing to the file)
> > > > > > > >
> > > > > > > >
> > > > > > > > Has anybody got any thoughts?
> > > > > > > >
> > > > > > > > Will
> > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Will Funnell
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Will Funnell
> >
>



-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Jay Kreps <ja...@gmail.com>.
The log end offset is just the end of the committed messages in the log
(the last thing the consumer has access to). It isn't the same as the
cleaner point but is always later than it so it would work just as well.

-Jay

On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w....@gmail.com> wrote:

> > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > along the lines of: we expose the log-end-offset (actually the high
> > watermark) of the partition in the fetch response. However, this is
> > not exposed to the consumer (either in the new ConsumerRecord class
> > or the existing MessageAndMetadata class). If we did, then if you
> > were to consume a record you can check that it has offsets up to the
> > log-end offset. If it does then you would know for sure that you have
> > consumed everything for that partition
>
> To confirm then, the log-end-offset is the same as the cleaner point?
>
>
>
> On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yeah I was thinking either along the lines Joel was suggesting or else
> > adding a logEndOffset(TopicPartition) method or something like that. As
> > Joel says the consumer actually has this information internally (we
> return
> > it with the fetch request) but doesn't expose it.
> >
> > -Jay
> >
> > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > > > 2. Make the log end offset available more easily in the consumer.
> > > >
> > > > Was thinking something would need to be added in LogCleanerManager,
> in
> > > the
> > > > updateCheckpoints function. Where would be best to publish the
> > > information
> > > > to make it more easily available, or would you just expose the
> > > > offset-cleaner-checkpoint file as it is?
> > > > Is it right you would also need to know which
> offset-cleaner-checkpoint
> > > > entry related to each active partition?
> > >
> > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > along the lines of: we expose the log-end-offset (actually the high
> > > watermark) of the partition in the fetch response. However, this is
> > > not exposed to the consumer (either in the new ConsumerRecord class
> > > or the existing MessageAndMetadata class). If we did, then if you
> > > were to consume a record you can check that it has offsets up to the
> > > log-end offset. If it does then you would know for sure that you have
> > > consumed everything for that partition.
> > >
> > > > Yes, was looking at this initially, but as we have 100-150 writes per
> > > > second, it could be a while before there is a pause long enough to
> > check
> > > it
> > > > has caught up. Even with the consumer timeout set to -1, it takes
> some
> > > time
> > > > to query the max offset values, which is still long enough for more
> > > > messages to arrive.
> > >
> > > Got it - thanks for clarifying.
> > >
> > > >
> > > >
> > > >
> > > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com>
> wrote:
> > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > >
> > > > > Isn't it sufficient to just repeat the check at the end after
> reading
> > > > > the log and repeat until you are truly done? At least for the
> > purposes
> > > > > of a snapshot?
> > > > >
> > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > If you catch up off a compacted topic and keep consuming then you
> > > will
> > > > > > become consistent with the log.
> > > > > >
> > > > > > I think what you are saying is that you want to create a snapshot
> > > from
> > > > > the
> > > > > > Kafka topic but NOT do continual reads after that point. For
> > example
> > > you
> > > > > > might be creating a backup of the data to a file.
> > > > > >
> > > > > > I agree that this isn't as easy as it could be. As you say the
> only
> > > > > > solution we have is that timeout which doesn't differentiate
> > between
> > > GC
> > > > > > stall in your process and no more messages left so you would need
> > to
> > > tune
> > > > > > the timeout. This is admittedly kind of a hack.
> > > > > >
> > > > > > You are also correct and perceptive to notice that if you check
> the
> > > end
> > > > > of
> > > > > > the log then begin consuming and read up to that point compaction
> > may
> > > > > have
> > > > > > already kicked in (if the reading takes a while) and hence you
> > might
> > > have
> > > > > > an incomplete snapshot.
> > > > > >
> > > > > > I think there are two features we could add that would make this
> > > easier:
> > > > > > 1. Make the cleaner point configurable on a per-topic basis. This
> > > feature
> > > > > > would allow you to control how long the full log is retained and
> > when
> > > > > > compaction can kick in. This would give a configurable SLA for
> the
> > > reader
> > > > > > process to catch up.
> > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > w.f.funnell@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in
> order
> > > to
> > > > > > > provide streams of messages to our clients.
> > > > > > >
> > > > > > > As well as constantly consuming the stream, one of our use
> cases
> > > is to
> > > > > > > provide a snapshot, meaning the user will receive a copy of
> every
> > > > > message
> > > > > > > at least once.
> > > > > > >
> > > > > > > Each one of these messages represents an item of content in our
> > > system.
> > > > > > >
> > > > > > >
> > > > > > > The problem comes when determining if the client has actually
> > > reached
> > > > > the
> > > > > > > end of the topic.
> > > > > > >
> > > > > > > The standard Kafka way of dealing with this seems to be by
> using
> > a
> > > > > > > ConsumerTimeoutException, but we are frequently getting this
> > error
> > > > > when the
> > > > > > > end of the topic has not been reached or even it may take a
> long
> > > time
> > > > > > > before a timeout naturally occurs.
> > > > > > >
> > > > > > >
> > > > > > > On first glance it would seem possible to do a lookup for the
> max
> > > > > offset
> > > > > > > for each partition when you begin consuming, stopping when this
> > > > > position it
> > > > > > > reached.
> > > > > > >
> > > > > > > But log compaction means that if an update to a piece of
> content
> > > > > arrives
> > > > > > > with the same message key, then this will be written to the end
> > so
> > > the
> > > > > > > snapshot will be incomplete.
> > > > > > >
> > > > > > >
> > > > > > > Another thought is to make use of the cleaner point. Currently
> > > Kafka
> > > > > writes
> > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> directory
> > > which
> > > > > is
> > > > > > > written to after log compaction completes.
> > > > > > >
> > > > > > > If the consumer was able to access the
> cleaner-offset-checkpoint
> > > you
> > > > > would
> > > > > > > be able to consume up to this point, check the point was still
> > the
> > > > > same,
> > > > > > > and compaction had not yet occurred, and therefore determine
> you
> > > had
> > > > > > > receive everything at least once. (Assuming there was no race
> > > condition
> > > > > > > between compaction and writing to the file)
> > > > > > >
> > > > > > >
> > > > > > > Has anybody got any thoughts?
> > > > > > >
> > > > > > > Will
> > > > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Will Funnell
> > >
> > >
> >
>
>
>
> --
> Will Funnell
>

Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
> I'm not sure if I misunderstood Jay's suggestion, but I think it is
> along the lines of: we expose the log-end-offset (actually the high
> watermark) of the partition in the fetch response. However, this is
> not exposed to the consumer (either in the new ConsumerRecord class
> or the existing MessageAndMetadata class). If we did, then if you
> were to consume a record you can check that it has offsets up to the
> log-end offset. If it does then you would know for sure that you have
> consumed everything for that partition

To confirm then, the log-end-offset is the same as the cleaner point?



On 19 February 2015 at 03:10, Jay Kreps <ja...@gmail.com> wrote:

> Yeah I was thinking either along the lines Joel was suggesting or else
> adding a logEndOffset(TopicPartition) method or something like that. As
> Joel says the consumer actually has this information internally (we return
> it with the fetch request) but doesn't expose it.
>
> -Jay
>
> On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > > > 2. Make the log end offset available more easily in the consumer.
> > >
> > > Was thinking something would need to be added in LogCleanerManager, in
> > the
> > > updateCheckpoints function. Where would be best to publish the
> > information
> > > to make it more easily available, or would you just expose the
> > > offset-cleaner-checkpoint file as it is?
> > > Is it right you would also need to know which offset-cleaner-checkpoint
> > > entry related to each active partition?
> >
> > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > along the lines of: we expose the log-end-offset (actually the high
> > watermark) of the partition in the fetch response. However, this is
> > not exposed to the consumer (either in the new ConsumerRecord class
> > or the existing MessageAndMetadata class). If we did, then if you
> > were to consume a record you can check that it has offsets up to the
> > log-end offset. If it does then you would know for sure that you have
> > consumed everything for that partition.
> >
> > > Yes, was looking at this initially, but as we have 100-150 writes per
> > > second, it could be a while before there is a pause long enough to
> check
> > it
> > > has caught up. Even with the consumer timeout set to -1, it takes some
> > time
> > > to query the max offset values, which is still long enough for more
> > > messages to arrive.
> >
> > Got it - thanks for clarifying.
> >
> > >
> > >
> > >
> > > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com> wrote:
> > >
> > > > > You are also correct and perceptive to notice that if you check the
> > end
> > > > of
> > > > > the log then begin consuming and read up to that point compaction
> may
> > > > have
> > > > > already kicked in (if the reading takes a while) and hence you
> might
> > have
> > > > > an incomplete snapshot.
> > > >
> > > > Isn't it sufficient to just repeat the check at the end after reading
> > > > the log and repeat until you are truly done? At least for the
> purposes
> > > > of a snapshot?
> > > >
> > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > If you catch up off a compacted topic and keep consuming then you
> > will
> > > > > become consistent with the log.
> > > > >
> > > > > I think what you are saying is that you want to create a snapshot
> > from
> > > > the
> > > > > Kafka topic but NOT do continual reads after that point. For
> example
> > you
> > > > > might be creating a backup of the data to a file.
> > > > >
> > > > > I agree that this isn't as easy as it could be. As you say the only
> > > > > solution we have is that timeout which doesn't differentiate
> between
> > GC
> > > > > stall in your process and no more messages left so you would need
> to
> > tune
> > > > > the timeout. This is admittedly kind of a hack.
> > > > >
> > > > > You are also correct and perceptive to notice that if you check the
> > end
> > > > of
> > > > > the log then begin consuming and read up to that point compaction
> may
> > > > have
> > > > > already kicked in (if the reading takes a while) and hence you
> might
> > have
> > > > > an incomplete snapshot.
> > > > >
> > > > > I think there are two features we could add that would make this
> > easier:
> > > > > 1. Make the cleaner point configurable on a per-topic basis. This
> > feature
> > > > > would allow you to control how long the full log is retained and
> when
> > > > > compaction can kick in. This would give a configurable SLA for the
> > reader
> > > > > process to catch up.
> > > > > 2. Make the log end offset available more easily in the consumer.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > w.f.funnell@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > We are currently using Kafka 0.8.1.1 with log compaction in order
> > to
> > > > > > provide streams of messages to our clients.
> > > > > >
> > > > > > As well as constantly consuming the stream, one of our use cases
> > is to
> > > > > > provide a snapshot, meaning the user will receive a copy of every
> > > > message
> > > > > > at least once.
> > > > > >
> > > > > > Each one of these messages represents an item of content in our
> > system.
> > > > > >
> > > > > >
> > > > > > The problem comes when determining if the client has actually
> > reached
> > > > the
> > > > > > end of the topic.
> > > > > >
> > > > > > The standard Kafka way of dealing with this seems to be by using
> a
> > > > > > ConsumerTimeoutException, but we are frequently getting this
> error
> > > > when the
> > > > > > end of the topic has not been reached or even it may take a long
> > time
> > > > > > before a timeout naturally occurs.
> > > > > >
> > > > > >
> > > > > > On first glance it would seem possible to do a lookup for the max
> > > > offset
> > > > > > for each partition when you begin consuming, stopping when this
> > > > position it
> > > > > > reached.
> > > > > >
> > > > > > But log compaction means that if an update to a piece of content
> > > > arrives
> > > > > > with the same message key, then this will be written to the end
> so
> > the
> > > > > > snapshot will be incomplete.
> > > > > >
> > > > > >
> > > > > > Another thought is to make use of the cleaner point. Currently
> > Kafka
> > > > writes
> > > > > > out to a "cleaner-offset-checkpoint" file in each data directory
> > which
> > > > is
> > > > > > written to after log compaction completes.
> > > > > >
> > > > > > If the consumer was able to access the cleaner-offset-checkpoint
> > you
> > > > would
> > > > > > be able to consume up to this point, check the point was still
> the
> > > > same,
> > > > > > and compaction had not yet occurred, and therefore determine you
> > had
> > > > > > receive everything at least once. (Assuming there was no race
> > condition
> > > > > > between compaction and writing to the file)
> > > > > >
> > > > > >
> > > > > > Has anybody got any thoughts?
> > > > > >
> > > > > > Will
> > > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Will Funnell
> >
> >
>



-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I was thinking either along the lines Joel was suggesting or else
adding a logEndOffset(TopicPartition) method or something like that. As
Joel says the consumer actually has this information internally (we return
it with the fetch request) but doesn't expose it.

-Jay

On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jj...@gmail.com> wrote:

> > > 2. Make the log end offset available more easily in the consumer.
> >
> > Was thinking something would need to be added in LogCleanerManager, in
> the
> > updateCheckpoints function. Where would be best to publish the
> information
> > to make it more easily available, or would you just expose the
> > offset-cleaner-checkpoint file as it is?
> > Is it right you would also need to know which offset-cleaner-checkpoint
> > entry related to each active partition?
>
> I'm not sure if I misunderstood Jay's suggestion, but I think it is
> along the lines of: we expose the log-end-offset (actually the high
> watermark) of the partition in the fetch response. However, this is
> not exposed to the consumer (either in the new ConsumerRecord class
> or the existing MessageAndMetadata class). If we did, then if you
> were to consume a record you can check that it has offsets up to the
> log-end offset. If it does then you would know for sure that you have
> consumed everything for that partition.
>
> > Yes, was looking at this initially, but as we have 100-150 writes per
> > second, it could be a while before there is a pause long enough to check
> it
> > has caught up. Even with the consumer timeout set to -1, it takes some
> time
> > to query the max offset values, which is still long enough for more
> > messages to arrive.
>
> Got it - thanks for clarifying.
>
> >
> >
> >
> > On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > > You are also correct and perceptive to notice that if you check the
> end
> > > of
> > > > the log then begin consuming and read up to that point compaction may
> > > have
> > > > already kicked in (if the reading takes a while) and hence you might
> have
> > > > an incomplete snapshot.
> > >
> > > Isn't it sufficient to just repeat the check at the end after reading
> > > the log and repeat until you are truly done? At least for the purposes
> > > of a snapshot?
> > >
> > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > If you catch up off a compacted topic and keep consuming then you
> will
> > > > become consistent with the log.
> > > >
> > > > I think what you are saying is that you want to create a snapshot
> from
> > > the
> > > > Kafka topic but NOT do continual reads after that point. For example
> you
> > > > might be creating a backup of the data to a file.
> > > >
> > > > I agree that this isn't as easy as it could be. As you say the only
> > > > solution we have is that timeout which doesn't differentiate between
> GC
> > > > stall in your process and no more messages left so you would need to
> tune
> > > > the timeout. This is admittedly kind of a hack.
> > > >
> > > > You are also correct and perceptive to notice that if you check the
> end
> > > of
> > > > the log then begin consuming and read up to that point compaction may
> > > have
> > > > already kicked in (if the reading takes a while) and hence you might
> have
> > > > an incomplete snapshot.
> > > >
> > > > I think there are two features we could add that would make this
> easier:
> > > > 1. Make the cleaner point configurable on a per-topic basis. This
> feature
> > > > would allow you to control how long the full log is retained and when
> > > > compaction can kick in. This would give a configurable SLA for the
> reader
> > > > process to catch up.
> > > > 2. Make the log end offset available more easily in the consumer.
> > > >
> > > > -Jay
> > > >
> > > >
> > > >
> > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> w.f.funnell@gmail.com>
> > > > wrote:
> > > >
> > > > > We are currently using Kafka 0.8.1.1 with log compaction in order
> to
> > > > > provide streams of messages to our clients.
> > > > >
> > > > > As well as constantly consuming the stream, one of our use cases
> is to
> > > > > provide a snapshot, meaning the user will receive a copy of every
> > > message
> > > > > at least once.
> > > > >
> > > > > Each one of these messages represents an item of content in our
> system.
> > > > >
> > > > >
> > > > > The problem comes when determining if the client has actually
> reached
> > > the
> > > > > end of the topic.
> > > > >
> > > > > The standard Kafka way of dealing with this seems to be by using a
> > > > > ConsumerTimeoutException, but we are frequently getting this error
> > > when the
> > > > > end of the topic has not been reached or even it may take a long
> time
> > > > > before a timeout naturally occurs.
> > > > >
> > > > >
> > > > > On first glance it would seem possible to do a lookup for the max
> > > offset
> > > > > for each partition when you begin consuming, stopping when this
> > > position it
> > > > > reached.
> > > > >
> > > > > But log compaction means that if an update to a piece of content
> > > arrives
> > > > > with the same message key, then this will be written to the end so
> the
> > > > > snapshot will be incomplete.
> > > > >
> > > > >
> > > > > Another thought is to make use of the cleaner point. Currently
> Kafka
> > > writes
> > > > > out to a "cleaner-offset-checkpoint" file in each data directory
> which
> > > is
> > > > > written to after log compaction completes.
> > > > >
> > > > > If the consumer was able to access the cleaner-offset-checkpoint
> you
> > > would
> > > > > be able to consume up to this point, check the point was still the
> > > same,
> > > > > and compaction had not yet occurred, and therefore determine you
> had
> > > > > receive everything at least once. (Assuming there was no race
> condition
> > > > > between compaction and writing to the file)
> > > > >
> > > > >
> > > > > Has anybody got any thoughts?
> > > > >
> > > > > Will
> > > > >
> > >
> > >
> >
> >
> > --
> > Will Funnell
>
>

Re: Consuming a snapshot from log compacted topic

Posted by Joel Koshy <jj...@gmail.com>.
> > 2. Make the log end offset available more easily in the consumer.
> 
> Was thinking something would need to be added in LogCleanerManager, in the
> updateCheckpoints function. Where would be best to publish the information
> to make it more easily available, or would you just expose the
> offset-cleaner-checkpoint file as it is?
> Is it right you would also need to know which offset-cleaner-checkpoint
> entry related to each active partition?

I'm not sure if I misunderstood Jay's suggestion, but I think it is
along the lines of: we expose the log-end-offset (actually the high
watermark) of the partition in the fetch response. However, this is
not exposed to the consumer (either in the new ConsumerRecord class
or the existing MessageAndMetadata class). If we did, then if you
were to consume a record you can check that it has offsets up to the
log-end offset. If it does then you would know for sure that you have
consumed everything for that partition.

> Yes, was looking at this initially, but as we have 100-150 writes per
> second, it could be a while before there is a pause long enough to check it
> has caught up. Even with the consumer timeout set to -1, it takes some time
> to query the max offset values, which is still long enough for more
> messages to arrive.

Got it - thanks for clarifying.

> 
> 
> 
> On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com> wrote:
> 
> > > You are also correct and perceptive to notice that if you check the end
> > of
> > > the log then begin consuming and read up to that point compaction may
> > have
> > > already kicked in (if the reading takes a while) and hence you might have
> > > an incomplete snapshot.
> >
> > Isn't it sufficient to just repeat the check at the end after reading
> > the log and repeat until you are truly done? At least for the purposes
> > of a snapshot?
> >
> > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > If you catch up off a compacted topic and keep consuming then you will
> > > become consistent with the log.
> > >
> > > I think what you are saying is that you want to create a snapshot from
> > the
> > > Kafka topic but NOT do continual reads after that point. For example you
> > > might be creating a backup of the data to a file.
> > >
> > > I agree that this isn't as easy as it could be. As you say the only
> > > solution we have is that timeout which doesn't differentiate between GC
> > > stall in your process and no more messages left so you would need to tune
> > > the timeout. This is admittedly kind of a hack.
> > >
> > > You are also correct and perceptive to notice that if you check the end
> > of
> > > the log then begin consuming and read up to that point compaction may
> > have
> > > already kicked in (if the reading takes a while) and hence you might have
> > > an incomplete snapshot.
> > >
> > > I think there are two features we could add that would make this easier:
> > > 1. Make the cleaner point configurable on a per-topic basis. This feature
> > > would allow you to control how long the full log is retained and when
> > > compaction can kick in. This would give a configurable SLA for the reader
> > > process to catch up.
> > > 2. Make the log end offset available more easily in the consumer.
> > >
> > > -Jay
> > >
> > >
> > >
> > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <w....@gmail.com>
> > > wrote:
> > >
> > > > We are currently using Kafka 0.8.1.1 with log compaction in order to
> > > > provide streams of messages to our clients.
> > > >
> > > > As well as constantly consuming the stream, one of our use cases is to
> > > > provide a snapshot, meaning the user will receive a copy of every
> > message
> > > > at least once.
> > > >
> > > > Each one of these messages represents an item of content in our system.
> > > >
> > > >
> > > > The problem comes when determining if the client has actually reached
> > the
> > > > end of the topic.
> > > >
> > > > The standard Kafka way of dealing with this seems to be by using a
> > > > ConsumerTimeoutException, but we are frequently getting this error
> > when the
> > > > end of the topic has not been reached or even it may take a long time
> > > > before a timeout naturally occurs.
> > > >
> > > >
> > > > On first glance it would seem possible to do a lookup for the max
> > offset
> > > > for each partition when you begin consuming, stopping when this
> > position it
> > > > reached.
> > > >
> > > > But log compaction means that if an update to a piece of content
> > arrives
> > > > with the same message key, then this will be written to the end so the
> > > > snapshot will be incomplete.
> > > >
> > > >
> > > > Another thought is to make use of the cleaner point. Currently Kafka
> > writes
> > > > out to a "cleaner-offset-checkpoint" file in each data directory which
> > is
> > > > written to after log compaction completes.
> > > >
> > > > If the consumer was able to access the cleaner-offset-checkpoint you
> > would
> > > > be able to consume up to this point, check the point was still the
> > same,
> > > > and compaction had not yet occurred, and therefore determine you had
> > > > receive everything at least once. (Assuming there was no race condition
> > > > between compaction and writing to the file)
> > > >
> > > >
> > > > Has anybody got any thoughts?
> > > >
> > > > Will
> > > >
> >
> >
> 
> 
> -- 
> Will Funnell


Re: Consuming a snapshot from log compacted topic

Posted by Will Funnell <w....@gmail.com>.
> Do you have to separate the snapshot from the "normal" update flow.

We are trying to avoid using another datasource if possible to have one
source of truth.

> I think what you are saying is that you want to create a snapshot from the
> Kafka topic but NOT do continual reads after that point. For example you
> might be creating a backup of the data to a file.

Yes, this is correct.

> I think there are two features we could add that would make this easier:
> 1. Make the cleaner point configurable on a per-topic basis. This feature
> would allow you to control how long the full log is retained and when
> compaction can kick in. This would give a configurable SLA for the reader
> process to catch up.

That sounds like it could work, I think if you could schedule the
time/frequency which compaction occured, clients could be scheduled to run
in between.

> 2. Make the log end offset available more easily in the consumer.

Was thinking something would need to be added in LogCleanerManager, in the
updateCheckpoints function. Where would be best to publish the information
to make it more easily available, or would you just expose the
offset-cleaner-checkpoint file as it is?
Is it right you would also need to know which offset-cleaner-checkpoint
entry related to each active partition?

> Isn't it sufficient to just repeat the check at the end after reading
> the log and repeat until you are truly done? At least for the purposes
> of a snapshot?

Yes, was looking at this initially, but as we have 100-150 writes per
second, it could be a while before there is a pause long enough to check it
has caught up. Even with the consumer timeout set to -1, it takes some time
to query the max offset values, which is still long enough for more
messages to arrive.



On 18 February 2015 at 23:16, Joel Koshy <jj...@gmail.com> wrote:

> > You are also correct and perceptive to notice that if you check the end
> of
> > the log then begin consuming and read up to that point compaction may
> have
> > already kicked in (if the reading takes a while) and hence you might have
> > an incomplete snapshot.
>
> Isn't it sufficient to just repeat the check at the end after reading
> the log and repeat until you are truly done? At least for the purposes
> of a snapshot?
>
> On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > If you catch up off a compacted topic and keep consuming then you will
> > become consistent with the log.
> >
> > I think what you are saying is that you want to create a snapshot from
> the
> > Kafka topic but NOT do continual reads after that point. For example you
> > might be creating a backup of the data to a file.
> >
> > I agree that this isn't as easy as it could be. As you say the only
> > solution we have is that timeout which doesn't differentiate between GC
> > stall in your process and no more messages left so you would need to tune
> > the timeout. This is admittedly kind of a hack.
> >
> > You are also correct and perceptive to notice that if you check the end
> of
> > the log then begin consuming and read up to that point compaction may
> have
> > already kicked in (if the reading takes a while) and hence you might have
> > an incomplete snapshot.
> >
> > I think there are two features we could add that would make this easier:
> > 1. Make the cleaner point configurable on a per-topic basis. This feature
> > would allow you to control how long the full log is retained and when
> > compaction can kick in. This would give a configurable SLA for the reader
> > process to catch up.
> > 2. Make the log end offset available more easily in the consumer.
> >
> > -Jay
> >
> >
> >
> > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <w....@gmail.com>
> > wrote:
> >
> > > We are currently using Kafka 0.8.1.1 with log compaction in order to
> > > provide streams of messages to our clients.
> > >
> > > As well as constantly consuming the stream, one of our use cases is to
> > > provide a snapshot, meaning the user will receive a copy of every
> message
> > > at least once.
> > >
> > > Each one of these messages represents an item of content in our system.
> > >
> > >
> > > The problem comes when determining if the client has actually reached
> the
> > > end of the topic.
> > >
> > > The standard Kafka way of dealing with this seems to be by using a
> > > ConsumerTimeoutException, but we are frequently getting this error
> when the
> > > end of the topic has not been reached or even it may take a long time
> > > before a timeout naturally occurs.
> > >
> > >
> > > On first glance it would seem possible to do a lookup for the max
> offset
> > > for each partition when you begin consuming, stopping when this
> position it
> > > reached.
> > >
> > > But log compaction means that if an update to a piece of content
> arrives
> > > with the same message key, then this will be written to the end so the
> > > snapshot will be incomplete.
> > >
> > >
> > > Another thought is to make use of the cleaner point. Currently Kafka
> writes
> > > out to a "cleaner-offset-checkpoint" file in each data directory which
> is
> > > written to after log compaction completes.
> > >
> > > If the consumer was able to access the cleaner-offset-checkpoint you
> would
> > > be able to consume up to this point, check the point was still the
> same,
> > > and compaction had not yet occurred, and therefore determine you had
> > > receive everything at least once. (Assuming there was no race condition
> > > between compaction and writing to the file)
> > >
> > >
> > > Has anybody got any thoughts?
> > >
> > > Will
> > >
>
>


-- 
Will Funnell

Re: Consuming a snapshot from log compacted topic

Posted by Joel Koshy <jj...@gmail.com>.
> You are also correct and perceptive to notice that if you check the end of
> the log then begin consuming and read up to that point compaction may have
> already kicked in (if the reading takes a while) and hence you might have
> an incomplete snapshot.

Isn't it sufficient to just repeat the check at the end after reading
the log and repeat until you are truly done? At least for the purposes
of a snapshot?

On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> If you catch up off a compacted topic and keep consuming then you will
> become consistent with the log.
> 
> I think what you are saying is that you want to create a snapshot from the
> Kafka topic but NOT do continual reads after that point. For example you
> might be creating a backup of the data to a file.
> 
> I agree that this isn't as easy as it could be. As you say the only
> solution we have is that timeout which doesn't differentiate between GC
> stall in your process and no more messages left so you would need to tune
> the timeout. This is admittedly kind of a hack.
> 
> You are also correct and perceptive to notice that if you check the end of
> the log then begin consuming and read up to that point compaction may have
> already kicked in (if the reading takes a while) and hence you might have
> an incomplete snapshot.
> 
> I think there are two features we could add that would make this easier:
> 1. Make the cleaner point configurable on a per-topic basis. This feature
> would allow you to control how long the full log is retained and when
> compaction can kick in. This would give a configurable SLA for the reader
> process to catch up.
> 2. Make the log end offset available more easily in the consumer.
> 
> -Jay
> 
> 
> 
> On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <w....@gmail.com>
> wrote:
> 
> > We are currently using Kafka 0.8.1.1 with log compaction in order to
> > provide streams of messages to our clients.
> >
> > As well as constantly consuming the stream, one of our use cases is to
> > provide a snapshot, meaning the user will receive a copy of every message
> > at least once.
> >
> > Each one of these messages represents an item of content in our system.
> >
> >
> > The problem comes when determining if the client has actually reached the
> > end of the topic.
> >
> > The standard Kafka way of dealing with this seems to be by using a
> > ConsumerTimeoutException, but we are frequently getting this error when the
> > end of the topic has not been reached or even it may take a long time
> > before a timeout naturally occurs.
> >
> >
> > On first glance it would seem possible to do a lookup for the max offset
> > for each partition when you begin consuming, stopping when this position it
> > reached.
> >
> > But log compaction means that if an update to a piece of content arrives
> > with the same message key, then this will be written to the end so the
> > snapshot will be incomplete.
> >
> >
> > Another thought is to make use of the cleaner point. Currently Kafka writes
> > out to a "cleaner-offset-checkpoint" file in each data directory which is
> > written to after log compaction completes.
> >
> > If the consumer was able to access the cleaner-offset-checkpoint you would
> > be able to consume up to this point, check the point was still the same,
> > and compaction had not yet occurred, and therefore determine you had
> > receive everything at least once. (Assuming there was no race condition
> > between compaction and writing to the file)
> >
> >
> > Has anybody got any thoughts?
> >
> > Will
> >


Re: Consuming a snapshot from log compacted topic

Posted by Jay Kreps <ja...@gmail.com>.
If you catch up off a compacted topic and keep consuming then you will
become consistent with the log.

I think what you are saying is that you want to create a snapshot from the
Kafka topic but NOT do continual reads after that point. For example you
might be creating a backup of the data to a file.

I agree that this isn't as easy as it could be. As you say the only
solution we have is that timeout which doesn't differentiate between GC
stall in your process and no more messages left so you would need to tune
the timeout. This is admittedly kind of a hack.

You are also correct and perceptive to notice that if you check the end of
the log then begin consuming and read up to that point compaction may have
already kicked in (if the reading takes a while) and hence you might have
an incomplete snapshot.

I think there are two features we could add that would make this easier:
1. Make the cleaner point configurable on a per-topic basis. This feature
would allow you to control how long the full log is retained and when
compaction can kick in. This would give a configurable SLA for the reader
process to catch up.
2. Make the log end offset available more easily in the consumer.

-Jay



On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <w....@gmail.com>
wrote:

> We are currently using Kafka 0.8.1.1 with log compaction in order to
> provide streams of messages to our clients.
>
> As well as constantly consuming the stream, one of our use cases is to
> provide a snapshot, meaning the user will receive a copy of every message
> at least once.
>
> Each one of these messages represents an item of content in our system.
>
>
> The problem comes when determining if the client has actually reached the
> end of the topic.
>
> The standard Kafka way of dealing with this seems to be by using a
> ConsumerTimeoutException, but we are frequently getting this error when the
> end of the topic has not been reached or even it may take a long time
> before a timeout naturally occurs.
>
>
> On first glance it would seem possible to do a lookup for the max offset
> for each partition when you begin consuming, stopping when this position it
> reached.
>
> But log compaction means that if an update to a piece of content arrives
> with the same message key, then this will be written to the end so the
> snapshot will be incomplete.
>
>
> Another thought is to make use of the cleaner point. Currently Kafka writes
> out to a "cleaner-offset-checkpoint" file in each data directory which is
> written to after log compaction completes.
>
> If the consumer was able to access the cleaner-offset-checkpoint you would
> be able to consume up to this point, check the point was still the same,
> and compaction had not yet occurred, and therefore determine you had
> receive everything at least once. (Assuming there was no race condition
> between compaction and writing to the file)
>
>
> Has anybody got any thoughts?
>
> Will
>