You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Taylor Gautier <tg...@tagged.com> on 2011/09/22 01:18:48 UTC

get offsets?

Hi,

Using Kafka 0.6.

I'm trying to use get offsets but it doesn't seem to work as I expect.  I
have  a test topic that has some messages in it.  Here's the output of a
test client that starts from offset 0 and prints all messages/offsets for
the topic:

Consumed message:foo offset: 12
Consumed message:bar offset: 24
Consumed message:foo offset: 36
Consumed message:bar offset: 48
Consumed message:hello offset: 62
Consumed message:world offset: 76

Here's a class to print the last n offsets:

public class SimpleConsumerDemo

{

  public static void main(String[] args)

  {

    SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 9092,
1000, 1024);


    long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L, 1);

    for (long l : offsets) {

        System.out.println("offset: " + l);

    }

  }

}


Running this as above, with 1 offset, yields expected results:

----output-----

offset: 76


However, asking for 3 offsets yields unexpected results:


change to: long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L,
3);

----output-----

offset: 76

offset: 0


I expected:


----output-----

offset: 76

offset: 62

offset: 48

Any idea why I did not get what I was looking for/what I am doing wrong?

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
Consider a log system which Kafka is designed for.

I would like operations such as:

logtail -f <some_stream>

logtail -f -n 1000 <some_stream>

logtail -f -t10h <some_stream

logtail -f -t0900 <some_stream

in other words, I want to watch messages in real time off the log stream,
but I also want to go back in time and watch them - for example suppose I
have written CEP or other scripts that watch log messages for patterns and
detect activity I am interested in.  I may do so "on the fly" and I may want
to "go back in time" to do so in case an event I am interested happened in
the past and I want to validate the code I am writing.


On Thu, Sep 22, 2011 at 6:07 AM, Chris Burroughs
<ch...@gmail.com>wrote:

> On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > I see that kafka-87 addresses this with a request for having a time based
> > index, this would be relatively useful, but I also would like to have a
> way
> > to go back say 1,000 messages.  Other than walking backwards one segment
> at
> > a time, can then scanning forward from there, do you have any suggestions
> > how this might be done or is it also a feature request?
>
> Could you elaborate a little on your use case where you need to rewind
> by a fixed number of messages?
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
We thought of another idea - if you treat the messages as a reverse linked
list, then you write the offset of the previous message at the end of each
message.  Thus a message format would look like:

header
msg
offset off msg

This would allow for walking backwards at the cost of linear scans - if you
assume that you don't do this often enough this wouldn't have any cost at
write or read time except a tiny (negligible) penalty in space.

On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I think we are going to go with Kafka itself hopefully if the code isn't
> too hard to update - it already has everything we need, we just change it's
> receive message operation from:
>
> receive msg => write into log file
>
> receive msg => write into log file and get offset, write offset into index
> log file
>
> where index log file is just another topic that contains 64 bit offsets of
> the original log file.
>
> of course with batching and sendfile calls this may be trickier than we
> anticipate….
>
> On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <je...@gmail.com>wrote:
>
>> This was something i've asked for in the past as well.  To neha's comment,
>> sounds like you'd need some kind of table to maintain the list of offsets
>> and which segment they live in?
>>
>>
>>
>> On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
>> <ch...@gmail.com>wrote:
>>
>> > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
>> > > I see that kafka-87 addresses this with a request for having a time
>> based
>> > > index, this would be relatively useful, but I also would like to have
>> a
>> > way
>> > > to go back say 1,000 messages.  Other than walking backwards one
>> segment
>> > at
>> > > a time, can then scanning forward from there, do you have any
>> suggestions
>> > > how this might be done or is it also a feature request?
>> >
>> > Could you elaborate a little on your use case where you need to rewind
>> > by a fixed number of messages?
>> >
>>
>
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
We thought of another idea - if you treat the messages as a reverse linked
list, then you write the offset of the previous message at the end of each
message.  Thus a message format would look like:

header
msg
offset off msg

This would allow for walking backwards at the cost of linear scans - if you
assume that you don't do this often enough this wouldn't have any cost at
write or read time except a tiny (negligible) penalty in space.

On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I think we are going to go with Kafka itself hopefully if the code isn't
> too hard to update - it already has everything we need, we just change it's
> receive message operation from:
>
> receive msg => write into log file
>
> receive msg => write into log file and get offset, write offset into index
> log file
>
> where index log file is just another topic that contains 64 bit offsets of
> the original log file.
>
> of course with batching and sendfile calls this may be trickier than we
> anticipate….
>
> On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <je...@gmail.com>wrote:
>
>> This was something i've asked for in the past as well.  To neha's comment,
>> sounds like you'd need some kind of table to maintain the list of offsets
>> and which segment they live in?
>>
>>
>>
>> On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
>> <ch...@gmail.com>wrote:
>>
>> > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
>> > > I see that kafka-87 addresses this with a request for having a time
>> based
>> > > index, this would be relatively useful, but I also would like to have
>> a
>> > way
>> > > to go back say 1,000 messages.  Other than walking backwards one
>> segment
>> > at
>> > > a time, can then scanning forward from there, do you have any
>> suggestions
>> > > how this might be done or is it also a feature request?
>> >
>> > Could you elaborate a little on your use case where you need to rewind
>> > by a fixed number of messages?
>> >
>>
>
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
Neha,

Wouldn't a table as you propose basically be a list of offsets?  Which would
pretty much look exactly like a topic full of offsets?  I mean the
shortcomings you list are essentially the shortcomings of any index modulo
the part about batching which could be implemented relatively easily I
suspect…no?

On Thu, Sep 22, 2011 at 2:11 PM, Jeffrey Damick <je...@gmail.com>wrote:

> But why search when you could track and index it directly?
>
> FWIW, We considering doing what someone suggested below as well, but for
> many of the same reasons listed below and the added complexity if you want
> it to be fault tolerant with tiered brokers we had to unfortunately abandon
> going further with kafka..
>
>
> On Thu, Sep 22, 2011 at 4:55 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> To neha's comment, sounds like you'd need some kind of table to
> maintain
> > the list of offsets and which segment they live in?
> >
> > Not really. What I was suggesting is maintaining a table per log
> > segment, keyed by offset that maintains a mapping from offset to
> > num-messages-since-start-of-log-segment.
> > This would allow a binary search to look for the closest offset to the
> > nth message in the log segment.
> >
> > >> for every write of a message to topic foo, write the new offset into
> > topic foo_i.
> >
> > If I understand the use case correctly, what you want is, at some
> > point in the consumption for topic 'foo', you want to "go back n
> > messages".
> >
> > I can see a number of problems with the approach mentioned above,
> >
> > 1. Writing offsets to topic foo_i per message in foo will be
> > expensive. There is no batching on the producer side. And also, no way
> > to ensure that each message in foo and its corresponding message in
> > foo_i are flushed to disk atomically to disk on the broker. There
> > could be a window of error there if the broker crashes.
> > 2. Garbage collection for topics foo and foo_i would have to be in lock
> > step.
> > 3. Depending on how frequently you need to "go back n messages", it
> > could lead to random disk IO for topic foo_i.
> > 4. This solution would lead to an extra topic per real topic, since
> > foo_i will not be able to encode a topic name in its message, or else
> > messages would be variable sized.
> > 5. Even if we assume you had the data correctly written to both
> > topics, while consuming topic 'foo', you'd have to keep track of how
> > many messages you have consumed, to be able to define an offset for
> > topic foo_i. That means, in addition to a consumed offset, you'd have
> > to keep track of number of messages consumed for that offset.
> >
> >
> > I think this information could be maintained by the broker hosting the
> > topic partition in a more consistent manner.
> >
> > Thanks,
> > Neha
> >
> > On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> > >
> > > I'm not sure this is the same use case - here you just need to remember
> > the
> > > last good offset, which is only ever written by the last consumer.  If
> > > things fail you just come back from the last good offset.
> > >
> > > There are many ways to store the last known good offset - in memory, in
> a
> > > filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer
> > here
> > > doesn't work as it's intended to make the simple thing work simply, but
> > not
> > > necessarily act as building blocks for more complex use cases, afaik.
> > >
> > > On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to add a somewhat similar use case, for going back to a
> > specific
> > > > offset (maybe this will be addressed with the time indexing thing in
> > Kafka
> > > > 87, by the way, is any of the upcoming features documented?)
> > > >
> > > > Let's say I want to design a fault tolerant system around Kafka's
> > ability
> > > > to
> > > > replay messages from a specific offset.
> > > > A chain of consumers reads messages from kafka, and the last one
> dumps
> > data
> > > > into some database.
> > > > What I want to achieve is, if any of the consumers fails, then a
> system
> > > > detects the failure, and replays messages from a specific offset.
> > > >
> > > > How this can be achieved:
> > > > 1) Instead of having the consumer reading from Kafka update ZK with
> the
> > > > latest offset, I have the _last_ node in the consumer chain, the one
> > that
> > > > writes to the DB, update ZK with an offset.
> > > > 2) A monitoring system detects node failures along the entire
> consumer
> > > > chain
> > > > 3) If the monitoring system detects a failure condition, then the
> > consumer
> > > > reading from Kafka will reset to the last "successful" offset written
> > into
> > > > ZK
> > > >
> > > > So far, I believe this differs from 0.6 functionality in the
> following
> > > > ways:
> > > > - The default KafkaConsumer is the one that updates ZK on a regular
> > basis.
> > > >  Instead I would like the ability to update ZK from a node other than
> > the
> > > > one directly consuming from Kafka.
> > > > - I would like the ability for the KafkaConsumer to reset to the last
> > > > offset
> > > > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer
> can
> > be
> > > > given an offset, but I don't want to lose the nice auto load
> balancing
> > > > features in the smart Consumer.
> > > >
> > > > Any thoughts?
> > > >
> > > > Many thanks.
> > > >
> > > > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tgautier@tagged.com
> >
> > > > wrote:
> > > >
> > > > > I think we are going to go with Kafka itself hopefully if the code
> > isn't
> > > > > too
> > > > > hard to update - it already has everything we need, we just change
> > it's
> > > > > receive message operation from:
> > > > >
> > > > > receive msg => write into log file
> > > > >
> > > > > receive msg => write into log file and get offset, write offset
> into
> > > > index
> > > > > log file
> > > > >
> > > > > where index log file is just another topic that contains 64 bit
> > offsets
> > > > of
> > > > > the original log file.
> > > > >
> > > > > of course with batching and sendfile calls this may be trickier
> than
> > we
> > > > > anticipate….
> > > > >
> > > > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <
> > jeffreydamick@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > This was something i've asked for in the past as well.  To neha's
> > > > > comment,
> > > > > > sounds like you'd need some kind of table to maintain the list of
> > > > offsets
> > > > > > and which segment they live in?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > > > <ch...@gmail.com>wrote:
> > > > > >
> > > > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > > > I see that kafka-87 addresses this with a request for having
> a
> > time
> > > > > > based
> > > > > > > > index, this would be relatively useful, but I also would like
> > to
> > > > have
> > > > > a
> > > > > > > way
> > > > > > > > to go back say 1,000 messages.  Other than walking backwards
> > one
> > > > > > segment
> > > > > > > at
> > > > > > > > a time, can then scanning forward from there, do you have any
> > > > > > suggestions
> > > > > > > > how this might be done or is it also a feature request?
> > > > > > >
> > > > > > > Could you elaborate a little on your use case where you need to
> > > > rewind
> > > > > > > by a fixed number of messages?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > --
> > > > *Evan Chan*
> > > > Senior Software Engineer |
> > > > ev@ooyala.com | (650) 996-4600
> > > > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > > > @ooyala<http://www.twitter.com/ooyala>
> > > >
> >
>

Re: get offsets?

Posted by Jeffrey Damick <je...@gmail.com>.
But why search when you could track and index it directly?

FWIW, We considering doing what someone suggested below as well, but for
many of the same reasons listed below and the added complexity if you want
it to be fault tolerant with tiered brokers we had to unfortunately abandon
going further with kafka..


On Thu, Sep 22, 2011 at 4:55 PM, Neha Narkhede <ne...@gmail.com>wrote:

> >> To neha's comment, sounds like you'd need some kind of table to maintain
> the list of offsets and which segment they live in?
>
> Not really. What I was suggesting is maintaining a table per log
> segment, keyed by offset that maintains a mapping from offset to
> num-messages-since-start-of-log-segment.
> This would allow a binary search to look for the closest offset to the
> nth message in the log segment.
>
> >> for every write of a message to topic foo, write the new offset into
> topic foo_i.
>
> If I understand the use case correctly, what you want is, at some
> point in the consumption for topic 'foo', you want to "go back n
> messages".
>
> I can see a number of problems with the approach mentioned above,
>
> 1. Writing offsets to topic foo_i per message in foo will be
> expensive. There is no batching on the producer side. And also, no way
> to ensure that each message in foo and its corresponding message in
> foo_i are flushed to disk atomically to disk on the broker. There
> could be a window of error there if the broker crashes.
> 2. Garbage collection for topics foo and foo_i would have to be in lock
> step.
> 3. Depending on how frequently you need to "go back n messages", it
> could lead to random disk IO for topic foo_i.
> 4. This solution would lead to an extra topic per real topic, since
> foo_i will not be able to encode a topic name in its message, or else
> messages would be variable sized.
> 5. Even if we assume you had the data correctly written to both
> topics, while consuming topic 'foo', you'd have to keep track of how
> many messages you have consumed, to be able to define an offset for
> topic foo_i. That means, in addition to a consumed offset, you'd have
> to keep track of number of messages consumed for that offset.
>
>
> I think this information could be maintained by the broker hosting the
> topic partition in a more consistent manner.
>
> Thanks,
> Neha
>
> On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com>
> wrote:
> >
> > I'm not sure this is the same use case - here you just need to remember
> the
> > last good offset, which is only ever written by the last consumer.  If
> > things fail you just come back from the last good offset.
> >
> > There are many ways to store the last known good offset - in memory, in a
> > filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer
> here
> > doesn't work as it's intended to make the simple thing work simply, but
> not
> > necessarily act as building blocks for more complex use cases, afaik.
> >
> > On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to add a somewhat similar use case, for going back to a
> specific
> > > offset (maybe this will be addressed with the time indexing thing in
> Kafka
> > > 87, by the way, is any of the upcoming features documented?)
> > >
> > > Let's say I want to design a fault tolerant system around Kafka's
> ability
> > > to
> > > replay messages from a specific offset.
> > > A chain of consumers reads messages from kafka, and the last one dumps
> data
> > > into some database.
> > > What I want to achieve is, if any of the consumers fails, then a system
> > > detects the failure, and replays messages from a specific offset.
> > >
> > > How this can be achieved:
> > > 1) Instead of having the consumer reading from Kafka update ZK with the
> > > latest offset, I have the _last_ node in the consumer chain, the one
> that
> > > writes to the DB, update ZK with an offset.
> > > 2) A monitoring system detects node failures along the entire consumer
> > > chain
> > > 3) If the monitoring system detects a failure condition, then the
> consumer
> > > reading from Kafka will reset to the last "successful" offset written
> into
> > > ZK
> > >
> > > So far, I believe this differs from 0.6 functionality in the following
> > > ways:
> > > - The default KafkaConsumer is the one that updates ZK on a regular
> basis.
> > >  Instead I would like the ability to update ZK from a node other than
> the
> > > one directly consuming from Kafka.
> > > - I would like the ability for the KafkaConsumer to reset to the last
> > > offset
> > > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can
> be
> > > given an offset, but I don't want to lose the nice auto load balancing
> > > features in the smart Consumer.
> > >
> > > Any thoughts?
> > >
> > > Many thanks.
> > >
> > > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> > > wrote:
> > >
> > > > I think we are going to go with Kafka itself hopefully if the code
> isn't
> > > > too
> > > > hard to update - it already has everything we need, we just change
> it's
> > > > receive message operation from:
> > > >
> > > > receive msg => write into log file
> > > >
> > > > receive msg => write into log file and get offset, write offset into
> > > index
> > > > log file
> > > >
> > > > where index log file is just another topic that contains 64 bit
> offsets
> > > of
> > > > the original log file.
> > > >
> > > > of course with batching and sendfile calls this may be trickier than
> we
> > > > anticipate….
> > > >
> > > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <
> jeffreydamick@gmail.com
> > > > >wrote:
> > > >
> > > > > This was something i've asked for in the past as well.  To neha's
> > > > comment,
> > > > > sounds like you'd need some kind of table to maintain the list of
> > > offsets
> > > > > and which segment they live in?
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > > <ch...@gmail.com>wrote:
> > > > >
> > > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > > I see that kafka-87 addresses this with a request for having a
> time
> > > > > based
> > > > > > > index, this would be relatively useful, but I also would like
> to
> > > have
> > > > a
> > > > > > way
> > > > > > > to go back say 1,000 messages.  Other than walking backwards
> one
> > > > > segment
> > > > > > at
> > > > > > > a time, can then scanning forward from there, do you have any
> > > > > suggestions
> > > > > > > how this might be done or is it also a feature request?
> > > > > >
> > > > > > Could you elaborate a little on your use case where you need to
> > > rewind
> > > > > > by a fixed number of messages?
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > --
> > > *Evan Chan*
> > > Senior Software Engineer |
> > > ev@ooyala.com | (650) 996-4600
> > > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > > @ooyala<http://www.twitter.com/ooyala>
> > >
>

Re: get offsets?

Posted by Neha Narkhede <ne...@gmail.com>.
>> To neha's comment, sounds like you'd need some kind of table to maintain the list of offsets and which segment they live in?

Not really. What I was suggesting is maintaining a table per log
segment, keyed by offset that maintains a mapping from offset to
num-messages-since-start-of-log-segment.
This would allow a binary search to look for the closest offset to the
nth message in the log segment.

>> for every write of a message to topic foo, write the new offset into topic foo_i.

If I understand the use case correctly, what you want is, at some
point in the consumption for topic 'foo', you want to "go back n
messages".

I can see a number of problems with the approach mentioned above,

1. Writing offsets to topic foo_i per message in foo will be
expensive. There is no batching on the producer side. And also, no way
to ensure that each message in foo and its corresponding message in
foo_i are flushed to disk atomically to disk on the broker. There
could be a window of error there if the broker crashes.
2. Garbage collection for topics foo and foo_i would have to be in lock step.
3. Depending on how frequently you need to "go back n messages", it
could lead to random disk IO for topic foo_i.
4. This solution would lead to an extra topic per real topic, since
foo_i will not be able to encode a topic name in its message, or else
messages would be variable sized.
5. Even if we assume you had the data correctly written to both
topics, while consuming topic 'foo', you'd have to keep track of how
many messages you have consumed, to be able to define an offset for
topic foo_i. That means, in addition to a consumed offset, you'd have
to keep track of number of messages consumed for that offset.


I think this information could be maintained by the broker hosting the
topic partition in a more consistent manner.

Thanks,
Neha

On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com> wrote:
>
> I'm not sure this is the same use case - here you just need to remember the
> last good offset, which is only ever written by the last consumer.  If
> things fail you just come back from the last good offset.
>
> There are many ways to store the last known good offset - in memory, in a
> filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer here
> doesn't work as it's intended to make the simple thing work simply, but not
> necessarily act as building blocks for more complex use cases, afaik.
>
> On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to add a somewhat similar use case, for going back to a specific
> > offset (maybe this will be addressed with the time indexing thing in Kafka
> > 87, by the way, is any of the upcoming features documented?)
> >
> > Let's say I want to design a fault tolerant system around Kafka's ability
> > to
> > replay messages from a specific offset.
> > A chain of consumers reads messages from kafka, and the last one dumps data
> > into some database.
> > What I want to achieve is, if any of the consumers fails, then a system
> > detects the failure, and replays messages from a specific offset.
> >
> > How this can be achieved:
> > 1) Instead of having the consumer reading from Kafka update ZK with the
> > latest offset, I have the _last_ node in the consumer chain, the one that
> > writes to the DB, update ZK with an offset.
> > 2) A monitoring system detects node failures along the entire consumer
> > chain
> > 3) If the monitoring system detects a failure condition, then the consumer
> > reading from Kafka will reset to the last "successful" offset written into
> > ZK
> >
> > So far, I believe this differs from 0.6 functionality in the following
> > ways:
> > - The default KafkaConsumer is the one that updates ZK on a regular basis.
> >  Instead I would like the ability to update ZK from a node other than the
> > one directly consuming from Kafka.
> > - I would like the ability for the KafkaConsumer to reset to the last
> > offset
> > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> > given an offset, but I don't want to lose the nice auto load balancing
> > features in the smart Consumer.
> >
> > Any thoughts?
> >
> > Many thanks.
> >
> > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> >
> > > I think we are going to go with Kafka itself hopefully if the code isn't
> > > too
> > > hard to update - it already has everything we need, we just change it's
> > > receive message operation from:
> > >
> > > receive msg => write into log file
> > >
> > > receive msg => write into log file and get offset, write offset into
> > index
> > > log file
> > >
> > > where index log file is just another topic that contains 64 bit offsets
> > of
> > > the original log file.
> > >
> > > of course with batching and sendfile calls this may be trickier than we
> > > anticipate….
> > >
> > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> > > >wrote:
> > >
> > > > This was something i've asked for in the past as well.  To neha's
> > > comment,
> > > > sounds like you'd need some kind of table to maintain the list of
> > offsets
> > > > and which segment they live in?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > <ch...@gmail.com>wrote:
> > > >
> > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > I see that kafka-87 addresses this with a request for having a time
> > > > based
> > > > > > index, this would be relatively useful, but I also would like to
> > have
> > > a
> > > > > way
> > > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > > segment
> > > > > at
> > > > > > a time, can then scanning forward from there, do you have any
> > > > suggestions
> > > > > > how this might be done or is it also a feature request?
> > > > >
> > > > > Could you elaborate a little on your use case where you need to
> > rewind
> > > > > by a fixed number of messages?
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > *Evan Chan*
> > Senior Software Engineer |
> > ev@ooyala.com | (650) 996-4600
> > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > @ooyala<http://www.twitter.com/ooyala>
> >

Re: get offsets?

Posted by Evan Chan <ev...@ooyala.com>.
On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I'm not sure this is the same use case - here you just need to remember the
> last good offset, which is only ever written by the last consumer.  If
> things fail you just come back from the last good offset.
>
> There are many ways to store the last known good offset - in memory, in a
> filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer
> here
> doesn't work as it's intended to make the simple thing work simply, but not
> necessarily act as building blocks for more complex use cases, afaik.
>

Is there a easy way to tell the regular Consumer (that reads offsets from
ZK) to
1) read the offset from ZK upon demand, or
2) start reading from an offset passed in (that one has stored elsewhere).

This might be a feature others are interested in.   If not, I suppose I can
create a fork.

-Evan


>
> On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to add a somewhat similar use case, for going back to a specific
> > offset (maybe this will be addressed with the time indexing thing in
> Kafka
> > 87, by the way, is any of the upcoming features documented?)
> >
> > Let's say I want to design a fault tolerant system around Kafka's ability
> > to
> > replay messages from a specific offset.
> > A chain of consumers reads messages from kafka, and the last one dumps
> data
> > into some database.
> > What I want to achieve is, if any of the consumers fails, then a system
> > detects the failure, and replays messages from a specific offset.
> >
> > How this can be achieved:
> > 1) Instead of having the consumer reading from Kafka update ZK with the
> > latest offset, I have the _last_ node in the consumer chain, the one that
> > writes to the DB, update ZK with an offset.
> > 2) A monitoring system detects node failures along the entire consumer
> > chain
> > 3) If the monitoring system detects a failure condition, then the
> consumer
> > reading from Kafka will reset to the last "successful" offset written
> into
> > ZK
> >
> > So far, I believe this differs from 0.6 functionality in the following
> > ways:
> > - The default KafkaConsumer is the one that updates ZK on a regular
> basis.
> >  Instead I would like the ability to update ZK from a node other than the
> > one directly consuming from Kafka.
> > - I would like the ability for the KafkaConsumer to reset to the last
> > offset
> > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> > given an offset, but I don't want to lose the nice auto load balancing
> > features in the smart Consumer.
> >
> > Any thoughts?
> >
> > Many thanks.
> >
> > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> >
> > > I think we are going to go with Kafka itself hopefully if the code
> isn't
> > > too
> > > hard to update - it already has everything we need, we just change it's
> > > receive message operation from:
> > >
> > > receive msg => write into log file
> > >
> > > receive msg => write into log file and get offset, write offset into
> > index
> > > log file
> > >
> > > where index log file is just another topic that contains 64 bit offsets
> > of
> > > the original log file.
> > >
> > > of course with batching and sendfile calls this may be trickier than we
> > > anticipate….
> > >
> > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <
> jeffreydamick@gmail.com
> > > >wrote:
> > >
> > > > This was something i've asked for in the past as well.  To neha's
> > > comment,
> > > > sounds like you'd need some kind of table to maintain the list of
> > offsets
> > > > and which segment they live in?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > <ch...@gmail.com>wrote:
> > > >
> > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > I see that kafka-87 addresses this with a request for having a
> time
> > > > based
> > > > > > index, this would be relatively useful, but I also would like to
> > have
> > > a
> > > > > way
> > > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > > segment
> > > > > at
> > > > > > a time, can then scanning forward from there, do you have any
> > > > suggestions
> > > > > > how this might be done or is it also a feature request?
> > > > >
> > > > > Could you elaborate a little on your use case where you need to
> > rewind
> > > > > by a fixed number of messages?
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > *Evan Chan*
> > Senior Software Engineer |
> > ev@ooyala.com | (650) 996-4600
> > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > @ooyala<http://www.twitter.com/ooyala>
> >
>



-- 
--
*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>

Re: get offsets?

Posted by Evan Chan <ev...@ooyala.com>.
On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I'm not sure this is the same use case - here you just need to remember the
> last good offset, which is only ever written by the last consumer.  If
> things fail you just come back from the last good offset.
>
> There are many ways to store the last known good offset - in memory, in a
> filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer
> here
> doesn't work as it's intended to make the simple thing work simply, but not
> necessarily act as building blocks for more complex use cases, afaik.
>

Is there a easy way to tell the regular Consumer (that reads offsets from
ZK) to
1) read the offset from ZK upon demand, or
2) start reading from an offset passed in (that one has stored elsewhere).

This might be a feature others are interested in.   If not, I suppose I can
create a fork.

-Evan


>
> On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to add a somewhat similar use case, for going back to a specific
> > offset (maybe this will be addressed with the time indexing thing in
> Kafka
> > 87, by the way, is any of the upcoming features documented?)
> >
> > Let's say I want to design a fault tolerant system around Kafka's ability
> > to
> > replay messages from a specific offset.
> > A chain of consumers reads messages from kafka, and the last one dumps
> data
> > into some database.
> > What I want to achieve is, if any of the consumers fails, then a system
> > detects the failure, and replays messages from a specific offset.
> >
> > How this can be achieved:
> > 1) Instead of having the consumer reading from Kafka update ZK with the
> > latest offset, I have the _last_ node in the consumer chain, the one that
> > writes to the DB, update ZK with an offset.
> > 2) A monitoring system detects node failures along the entire consumer
> > chain
> > 3) If the monitoring system detects a failure condition, then the
> consumer
> > reading from Kafka will reset to the last "successful" offset written
> into
> > ZK
> >
> > So far, I believe this differs from 0.6 functionality in the following
> > ways:
> > - The default KafkaConsumer is the one that updates ZK on a regular
> basis.
> >  Instead I would like the ability to update ZK from a node other than the
> > one directly consuming from Kafka.
> > - I would like the ability for the KafkaConsumer to reset to the last
> > offset
> > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> > given an offset, but I don't want to lose the nice auto load balancing
> > features in the smart Consumer.
> >
> > Any thoughts?
> >
> > Many thanks.
> >
> > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> >
> > > I think we are going to go with Kafka itself hopefully if the code
> isn't
> > > too
> > > hard to update - it already has everything we need, we just change it's
> > > receive message operation from:
> > >
> > > receive msg => write into log file
> > >
> > > receive msg => write into log file and get offset, write offset into
> > index
> > > log file
> > >
> > > where index log file is just another topic that contains 64 bit offsets
> > of
> > > the original log file.
> > >
> > > of course with batching and sendfile calls this may be trickier than we
> > > anticipate….
> > >
> > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <
> jeffreydamick@gmail.com
> > > >wrote:
> > >
> > > > This was something i've asked for in the past as well.  To neha's
> > > comment,
> > > > sounds like you'd need some kind of table to maintain the list of
> > offsets
> > > > and which segment they live in?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > <ch...@gmail.com>wrote:
> > > >
> > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > I see that kafka-87 addresses this with a request for having a
> time
> > > > based
> > > > > > index, this would be relatively useful, but I also would like to
> > have
> > > a
> > > > > way
> > > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > > segment
> > > > > at
> > > > > > a time, can then scanning forward from there, do you have any
> > > > suggestions
> > > > > > how this might be done or is it also a feature request?
> > > > >
> > > > > Could you elaborate a little on your use case where you need to
> > rewind
> > > > > by a fixed number of messages?
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > *Evan Chan*
> > Senior Software Engineer |
> > ev@ooyala.com | (650) 996-4600
> > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > @ooyala<http://www.twitter.com/ooyala>
> >
>



-- 
--
*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>

Re: get offsets?

Posted by Neha Narkhede <ne...@gmail.com>.
>> To neha's comment, sounds like you'd need some kind of table to maintain the list of offsets and which segment they live in?

Not really. What I was suggesting is maintaining a table per log
segment, keyed by offset that maintains a mapping from offset to
num-messages-since-start-of-log-segment.
This would allow a binary search to look for the closest offset to the
nth message in the log segment.

>> for every write of a message to topic foo, write the new offset into topic foo_i.

If I understand the use case correctly, what you want is, at some
point in the consumption for topic 'foo', you want to "go back n
messages".

I can see a number of problems with the approach mentioned above,

1. Writing offsets to topic foo_i per message in foo will be
expensive. There is no batching on the producer side. And also, no way
to ensure that each message in foo and its corresponding message in
foo_i are flushed to disk atomically to disk on the broker. There
could be a window of error there if the broker crashes.
2. Garbage collection for topics foo and foo_i would have to be in lock step.
3. Depending on how frequently you need to "go back n messages", it
could lead to random disk IO for topic foo_i.
4. This solution would lead to an extra topic per real topic, since
foo_i will not be able to encode a topic name in its message, or else
messages would be variable sized.
5. Even if we assume you had the data correctly written to both
topics, while consuming topic 'foo', you'd have to keep track of how
many messages you have consumed, to be able to define an offset for
topic foo_i. That means, in addition to a consumed offset, you'd have
to keep track of number of messages consumed for that offset.


I think this information could be maintained by the broker hosting the
topic partition in a more consistent manner.

Thanks,
Neha

On Thu, Sep 22, 2011 at 9:49 AM, Taylor Gautier <tg...@tagged.com> wrote:
>
> I'm not sure this is the same use case - here you just need to remember the
> last good offset, which is only ever written by the last consumer.  If
> things fail you just come back from the last good offset.
>
> There are many ways to store the last known good offset - in memory, in a
> filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer here
> doesn't work as it's intended to make the simple thing work simply, but not
> necessarily act as building blocks for more complex use cases, afaik.
>
> On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to add a somewhat similar use case, for going back to a specific
> > offset (maybe this will be addressed with the time indexing thing in Kafka
> > 87, by the way, is any of the upcoming features documented?)
> >
> > Let's say I want to design a fault tolerant system around Kafka's ability
> > to
> > replay messages from a specific offset.
> > A chain of consumers reads messages from kafka, and the last one dumps data
> > into some database.
> > What I want to achieve is, if any of the consumers fails, then a system
> > detects the failure, and replays messages from a specific offset.
> >
> > How this can be achieved:
> > 1) Instead of having the consumer reading from Kafka update ZK with the
> > latest offset, I have the _last_ node in the consumer chain, the one that
> > writes to the DB, update ZK with an offset.
> > 2) A monitoring system detects node failures along the entire consumer
> > chain
> > 3) If the monitoring system detects a failure condition, then the consumer
> > reading from Kafka will reset to the last "successful" offset written into
> > ZK
> >
> > So far, I believe this differs from 0.6 functionality in the following
> > ways:
> > - The default KafkaConsumer is the one that updates ZK on a regular basis.
> >  Instead I would like the ability to update ZK from a node other than the
> > one directly consuming from Kafka.
> > - I would like the ability for the KafkaConsumer to reset to the last
> > offset
> > in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> > given an offset, but I don't want to lose the nice auto load balancing
> > features in the smart Consumer.
> >
> > Any thoughts?
> >
> > Many thanks.
> >
> > On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> >
> > > I think we are going to go with Kafka itself hopefully if the code isn't
> > > too
> > > hard to update - it already has everything we need, we just change it's
> > > receive message operation from:
> > >
> > > receive msg => write into log file
> > >
> > > receive msg => write into log file and get offset, write offset into
> > index
> > > log file
> > >
> > > where index log file is just another topic that contains 64 bit offsets
> > of
> > > the original log file.
> > >
> > > of course with batching and sendfile calls this may be trickier than we
> > > anticipate….
> > >
> > > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> > > >wrote:
> > >
> > > > This was something i've asked for in the past as well.  To neha's
> > > comment,
> > > > sounds like you'd need some kind of table to maintain the list of
> > offsets
> > > > and which segment they live in?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > > <ch...@gmail.com>wrote:
> > > >
> > > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > > I see that kafka-87 addresses this with a request for having a time
> > > > based
> > > > > > index, this would be relatively useful, but I also would like to
> > have
> > > a
> > > > > way
> > > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > > segment
> > > > > at
> > > > > > a time, can then scanning forward from there, do you have any
> > > > suggestions
> > > > > > how this might be done or is it also a feature request?
> > > > >
> > > > > Could you elaborate a little on your use case where you need to
> > rewind
> > > > > by a fixed number of messages?
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > *Evan Chan*
> > Senior Software Engineer |
> > ev@ooyala.com | (650) 996-4600
> > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > @ooyala<http://www.twitter.com/ooyala>
> >

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
I'm not sure this is the same use case - here you just need to remember the
last good offset, which is only ever written by the last consumer.  If
things fail you just come back from the last good offset.

There are many ways to store the last known good offset - in memory, in a
filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer here
doesn't work as it's intended to make the simple thing work simply, but not
necessarily act as building blocks for more complex use cases, afaik.

On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:

> Hi everyone,
>
> I'd like to add a somewhat similar use case, for going back to a specific
> offset (maybe this will be addressed with the time indexing thing in Kafka
> 87, by the way, is any of the upcoming features documented?)
>
> Let's say I want to design a fault tolerant system around Kafka's ability
> to
> replay messages from a specific offset.
> A chain of consumers reads messages from kafka, and the last one dumps data
> into some database.
> What I want to achieve is, if any of the consumers fails, then a system
> detects the failure, and replays messages from a specific offset.
>
> How this can be achieved:
> 1) Instead of having the consumer reading from Kafka update ZK with the
> latest offset, I have the _last_ node in the consumer chain, the one that
> writes to the DB, update ZK with an offset.
> 2) A monitoring system detects node failures along the entire consumer
> chain
> 3) If the monitoring system detects a failure condition, then the consumer
> reading from Kafka will reset to the last "successful" offset written into
> ZK
>
> So far, I believe this differs from 0.6 functionality in the following
> ways:
> - The default KafkaConsumer is the one that updates ZK on a regular basis.
>  Instead I would like the ability to update ZK from a node other than the
> one directly consuming from Kafka.
> - I would like the ability for the KafkaConsumer to reset to the last
> offset
> in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> given an offset, but I don't want to lose the nice auto load balancing
> features in the smart Consumer.
>
> Any thoughts?
>
> Many thanks.
>
> On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> wrote:
>
> > I think we are going to go with Kafka itself hopefully if the code isn't
> > too
> > hard to update - it already has everything we need, we just change it's
> > receive message operation from:
> >
> > receive msg => write into log file
> >
> > receive msg => write into log file and get offset, write offset into
> index
> > log file
> >
> > where index log file is just another topic that contains 64 bit offsets
> of
> > the original log file.
> >
> > of course with batching and sendfile calls this may be trickier than we
> > anticipate….
> >
> > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> > >wrote:
> >
> > > This was something i've asked for in the past as well.  To neha's
> > comment,
> > > sounds like you'd need some kind of table to maintain the list of
> offsets
> > > and which segment they live in?
> > >
> > >
> > >
> > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > <ch...@gmail.com>wrote:
> > >
> > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > I see that kafka-87 addresses this with a request for having a time
> > > based
> > > > > index, this would be relatively useful, but I also would like to
> have
> > a
> > > > way
> > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > segment
> > > > at
> > > > > a time, can then scanning forward from there, do you have any
> > > suggestions
> > > > > how this might be done or is it also a feature request?
> > > >
> > > > Could you elaborate a little on your use case where you need to
> rewind
> > > > by a fixed number of messages?
> > > >
> > >
> >
>
>
>
> --
> --
> *Evan Chan*
> Senior Software Engineer |
> ev@ooyala.com | (650) 996-4600
> www.ooyala.com | blog <http://www.ooyala.com/blog> |
> @ooyala<http://www.twitter.com/ooyala>
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
I'm not sure this is the same use case - here you just need to remember the
last good offset, which is only ever written by the last consumer.  If
things fail you just come back from the last good offset.

There are many ways to store the last known good offset - in memory, in a
filesystem, in memcache, in the db, or in zk.  Using the simpleconsumer here
doesn't work as it's intended to make the simple thing work simply, but not
necessarily act as building blocks for more complex use cases, afaik.

On Thu, Sep 22, 2011 at 9:41 AM, Evan Chan <ev...@ooyala.com> wrote:

> Hi everyone,
>
> I'd like to add a somewhat similar use case, for going back to a specific
> offset (maybe this will be addressed with the time indexing thing in Kafka
> 87, by the way, is any of the upcoming features documented?)
>
> Let's say I want to design a fault tolerant system around Kafka's ability
> to
> replay messages from a specific offset.
> A chain of consumers reads messages from kafka, and the last one dumps data
> into some database.
> What I want to achieve is, if any of the consumers fails, then a system
> detects the failure, and replays messages from a specific offset.
>
> How this can be achieved:
> 1) Instead of having the consumer reading from Kafka update ZK with the
> latest offset, I have the _last_ node in the consumer chain, the one that
> writes to the DB, update ZK with an offset.
> 2) A monitoring system detects node failures along the entire consumer
> chain
> 3) If the monitoring system detects a failure condition, then the consumer
> reading from Kafka will reset to the last "successful" offset written into
> ZK
>
> So far, I believe this differs from 0.6 functionality in the following
> ways:
> - The default KafkaConsumer is the one that updates ZK on a regular basis.
>  Instead I would like the ability to update ZK from a node other than the
> one directly consuming from Kafka.
> - I would like the ability for the KafkaConsumer to reset to the last
> offset
> in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
> given an offset, but I don't want to lose the nice auto load balancing
> features in the smart Consumer.
>
> Any thoughts?
>
> Many thanks.
>
> On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com>
> wrote:
>
> > I think we are going to go with Kafka itself hopefully if the code isn't
> > too
> > hard to update - it already has everything we need, we just change it's
> > receive message operation from:
> >
> > receive msg => write into log file
> >
> > receive msg => write into log file and get offset, write offset into
> index
> > log file
> >
> > where index log file is just another topic that contains 64 bit offsets
> of
> > the original log file.
> >
> > of course with batching and sendfile calls this may be trickier than we
> > anticipate….
> >
> > On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> > >wrote:
> >
> > > This was something i've asked for in the past as well.  To neha's
> > comment,
> > > sounds like you'd need some kind of table to maintain the list of
> offsets
> > > and which segment they live in?
> > >
> > >
> > >
> > > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > > <ch...@gmail.com>wrote:
> > >
> > > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > > I see that kafka-87 addresses this with a request for having a time
> > > based
> > > > > index, this would be relatively useful, but I also would like to
> have
> > a
> > > > way
> > > > > to go back say 1,000 messages.  Other than walking backwards one
> > > segment
> > > > at
> > > > > a time, can then scanning forward from there, do you have any
> > > suggestions
> > > > > how this might be done or is it also a feature request?
> > > >
> > > > Could you elaborate a little on your use case where you need to
> rewind
> > > > by a fixed number of messages?
> > > >
> > >
> >
>
>
>
> --
> --
> *Evan Chan*
> Senior Software Engineer |
> ev@ooyala.com | (650) 996-4600
> www.ooyala.com | blog <http://www.ooyala.com/blog> |
> @ooyala<http://www.twitter.com/ooyala>
>

Re: get offsets?

Posted by Evan Chan <ev...@ooyala.com>.
Hi everyone,

I'd like to add a somewhat similar use case, for going back to a specific
offset (maybe this will be addressed with the time indexing thing in Kafka
87, by the way, is any of the upcoming features documented?)

Let's say I want to design a fault tolerant system around Kafka's ability to
replay messages from a specific offset.
A chain of consumers reads messages from kafka, and the last one dumps data
into some database.
What I want to achieve is, if any of the consumers fails, then a system
detects the failure, and replays messages from a specific offset.

How this can be achieved:
1) Instead of having the consumer reading from Kafka update ZK with the
latest offset, I have the _last_ node in the consumer chain, the one that
writes to the DB, update ZK with an offset.
2) A monitoring system detects node failures along the entire consumer chain
3) If the monitoring system detects a failure condition, then the consumer
reading from Kafka will reset to the last "successful" offset written into
ZK

So far, I believe this differs from 0.6 functionality in the following ways:
- The default KafkaConsumer is the one that updates ZK on a regular basis.
 Instead I would like the ability to update ZK from a node other than the
one directly consuming from Kafka.
- I would like the ability for the KafkaConsumer to reset to the last offset
in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
given an offset, but I don't want to lose the nice auto load balancing
features in the smart Consumer.

Any thoughts?

Many thanks.

On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I think we are going to go with Kafka itself hopefully if the code isn't
> too
> hard to update - it already has everything we need, we just change it's
> receive message operation from:
>
> receive msg => write into log file
>
> receive msg => write into log file and get offset, write offset into index
> log file
>
> where index log file is just another topic that contains 64 bit offsets of
> the original log file.
>
> of course with batching and sendfile calls this may be trickier than we
> anticipate….
>
> On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> >wrote:
>
> > This was something i've asked for in the past as well.  To neha's
> comment,
> > sounds like you'd need some kind of table to maintain the list of offsets
> > and which segment they live in?
> >
> >
> >
> > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > <ch...@gmail.com>wrote:
> >
> > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > I see that kafka-87 addresses this with a request for having a time
> > based
> > > > index, this would be relatively useful, but I also would like to have
> a
> > > way
> > > > to go back say 1,000 messages.  Other than walking backwards one
> > segment
> > > at
> > > > a time, can then scanning forward from there, do you have any
> > suggestions
> > > > how this might be done or is it also a feature request?
> > >
> > > Could you elaborate a little on your use case where you need to rewind
> > > by a fixed number of messages?
> > >
> >
>



-- 
--
*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>

Re: get offsets?

Posted by Evan Chan <ev...@ooyala.com>.
Hi everyone,

I'd like to add a somewhat similar use case, for going back to a specific
offset (maybe this will be addressed with the time indexing thing in Kafka
87, by the way, is any of the upcoming features documented?)

Let's say I want to design a fault tolerant system around Kafka's ability to
replay messages from a specific offset.
A chain of consumers reads messages from kafka, and the last one dumps data
into some database.
What I want to achieve is, if any of the consumers fails, then a system
detects the failure, and replays messages from a specific offset.

How this can be achieved:
1) Instead of having the consumer reading from Kafka update ZK with the
latest offset, I have the _last_ node in the consumer chain, the one that
writes to the DB, update ZK with an offset.
2) A monitoring system detects node failures along the entire consumer chain
3) If the monitoring system detects a failure condition, then the consumer
reading from Kafka will reset to the last "successful" offset written into
ZK

So far, I believe this differs from 0.6 functionality in the following ways:
- The default KafkaConsumer is the one that updates ZK on a regular basis.
 Instead I would like the ability to update ZK from a node other than the
one directly consuming from Kafka.
- I would like the ability for the KafkaConsumer to reset to the last offset
in ZK (or maybe some arbitrary offset).  I know the SimpleConsumer can be
given an offset, but I don't want to lose the nice auto load balancing
features in the smart Consumer.

Any thoughts?

Many thanks.

On Thu, Sep 22, 2011 at 8:22 AM, Taylor Gautier <tg...@tagged.com> wrote:

> I think we are going to go with Kafka itself hopefully if the code isn't
> too
> hard to update - it already has everything we need, we just change it's
> receive message operation from:
>
> receive msg => write into log file
>
> receive msg => write into log file and get offset, write offset into index
> log file
>
> where index log file is just another topic that contains 64 bit offsets of
> the original log file.
>
> of course with batching and sendfile calls this may be trickier than we
> anticipate….
>
> On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <jeffreydamick@gmail.com
> >wrote:
>
> > This was something i've asked for in the past as well.  To neha's
> comment,
> > sounds like you'd need some kind of table to maintain the list of offsets
> > and which segment they live in?
> >
> >
> >
> > On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> > <ch...@gmail.com>wrote:
> >
> > > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > > I see that kafka-87 addresses this with a request for having a time
> > based
> > > > index, this would be relatively useful, but I also would like to have
> a
> > > way
> > > > to go back say 1,000 messages.  Other than walking backwards one
> > segment
> > > at
> > > > a time, can then scanning forward from there, do you have any
> > suggestions
> > > > how this might be done or is it also a feature request?
> > >
> > > Could you elaborate a little on your use case where you need to rewind
> > > by a fixed number of messages?
> > >
> >
>



-- 
--
*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
I think we are going to go with Kafka itself hopefully if the code isn't too
hard to update - it already has everything we need, we just change it's
receive message operation from:

receive msg => write into log file

receive msg => write into log file and get offset, write offset into index
log file

where index log file is just another topic that contains 64 bit offsets of
the original log file.

of course with batching and sendfile calls this may be trickier than we
anticipate….

On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <je...@gmail.com>wrote:

> This was something i've asked for in the past as well.  To neha's comment,
> sounds like you'd need some kind of table to maintain the list of offsets
> and which segment they live in?
>
>
>
> On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> <ch...@gmail.com>wrote:
>
> > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > I see that kafka-87 addresses this with a request for having a time
> based
> > > index, this would be relatively useful, but I also would like to have a
> > way
> > > to go back say 1,000 messages.  Other than walking backwards one
> segment
> > at
> > > a time, can then scanning forward from there, do you have any
> suggestions
> > > how this might be done or is it also a feature request?
> >
> > Could you elaborate a little on your use case where you need to rewind
> > by a fixed number of messages?
> >
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
I think we are going to go with Kafka itself hopefully if the code isn't too
hard to update - it already has everything we need, we just change it's
receive message operation from:

receive msg => write into log file

receive msg => write into log file and get offset, write offset into index
log file

where index log file is just another topic that contains 64 bit offsets of
the original log file.

of course with batching and sendfile calls this may be trickier than we
anticipate….

On Thu, Sep 22, 2011 at 8:17 AM, Jeffrey Damick <je...@gmail.com>wrote:

> This was something i've asked for in the past as well.  To neha's comment,
> sounds like you'd need some kind of table to maintain the list of offsets
> and which segment they live in?
>
>
>
> On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
> <ch...@gmail.com>wrote:
>
> > On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > > I see that kafka-87 addresses this with a request for having a time
> based
> > > index, this would be relatively useful, but I also would like to have a
> > way
> > > to go back say 1,000 messages.  Other than walking backwards one
> segment
> > at
> > > a time, can then scanning forward from there, do you have any
> suggestions
> > > how this might be done or is it also a feature request?
> >
> > Could you elaborate a little on your use case where you need to rewind
> > by a fixed number of messages?
> >
>

Re: get offsets?

Posted by Jeffrey Damick <je...@gmail.com>.
This was something i've asked for in the past as well.  To neha's comment,
sounds like you'd need some kind of table to maintain the list of offsets
and which segment they live in?



On Thu, Sep 22, 2011 at 9:07 AM, Chris Burroughs
<ch...@gmail.com>wrote:

> On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> > I see that kafka-87 addresses this with a request for having a time based
> > index, this would be relatively useful, but I also would like to have a
> way
> > to go back say 1,000 messages.  Other than walking backwards one segment
> at
> > a time, can then scanning forward from there, do you have any suggestions
> > how this might be done or is it also a feature request?
>
> Could you elaborate a little on your use case where you need to rewind
> by a fixed number of messages?
>

Re: get offsets?

Posted by Chris Burroughs <ch...@gmail.com>.
On 09/21/2011 10:06 PM, Taylor Gautier wrote:
> I see that kafka-87 addresses this with a request for having a time based
> index, this would be relatively useful, but I also would like to have a way
> to go back say 1,000 messages.  Other than walking backwards one segment at
> a time, can then scanning forward from there, do you have any suggestions
> how this might be done or is it also a feature request?

Could you elaborate a little on your use case where you need to rewind
by a fixed number of messages?

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
Ok - here's a thought from my colleague….

the index we need for this can mostly be implemented using kafka itself -

for every write of a message to topic foo, write the new offset into topic
foo_i.   Now you have a topic that consists of  fixed length messages
(offsets) that essentially supports random access through the existing kafka
api, thus computing the nth message in the index is trivial, and that
message gives you the original offset to be used in the original topic.

The only trouble to do this outside of kafka (say a special producer) is
that as far as I can tell producers don't get told the offsets of the
messages they wrote, and even if they did more than one producer would end
up with an out of order index.

On Wed, Sep 21, 2011 at 7:32 PM, Neha Narkhede <ne...@gmail.com>wrote:

> >> but I also would like to have a way to go back say 1,000 messages.
>
> Given the implementation we have today, that would be hard to implement in
> the absence of an index. Each message in a kafka log segment could be of a
> different size. Given that you are at some reference point in the segment
> and you want to go back "n" messages, there is no way to know what offset
> that might be. Even if you decide to do some kind of binary search on the
> offsets, it would be hard to guess how far away you are from the reference
> point.
>
> The binary search approach would work if we kept an index per log segment,
> that tracks every nth offset and the number of messages since the previous
> offset key.
>
> I'm not aware of any JIRA tracking this feature request. Could you file one
> ?
>
> Thanks,
> Neha
>
> On Wed, Sep 21, 2011 at 7:06 PM, Taylor Gautier <tg...@tagged.com>
> wrote:
>
> > I see (and I think I understand why it works this way).
> >
> > I see that kafka-87 addresses this with a request for having a time based
> > index, this would be relatively useful, but I also would like to have a
> way
> > to go back say 1,000 messages.  Other than walking backwards one segment
> at
> > a time, can then scanning forward from there, do you have any suggestions
> > how this might be done or is it also a feature request?
> >
> > On Wed, Sep 21, 2011 at 5:14 PM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > Hi Taylor,
> > >
> > > This is an FAQ, that was asked some time ago as well:
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201108.mbox/%3CCAFbh0Q3QgsYXn4kjTTh0zZ9DJZ7J14tM5g24RFJSihFfbqDNyw@mail.gmail.com%3E
> > >
> > > GetOffsetShell doesn't return the offset of every message. It returns
> the
> > > offset of the first message in every segment file. If you provide a
> time,
> > > the offsets you get back are based on the last modified time of the
> > segment
> > > files.
> > >
> > > It may be a good idea to paraphrase the above to be displayed with the
> > > tool's help message.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Wed, Sep 21, 2011 at 4:18 PM, Taylor Gautier <tg...@tagged.com>
> > > wrote:
> > > > Hi,
> > > >
> > > > Using Kafka 0.6.
> > > >
> > > > I'm trying to use get offsets but it doesn't seem to work as I
> expect.
> >  I
> > > > have  a test topic that has some messages in it.  Here's the output
> of
> > a
> > > > test client that starts from offset 0 and prints all messages/offsets
> > for
> > > > the topic:
> > > >
> > > > Consumed message:foo offset: 12
> > > > Consumed message:bar offset: 24
> > > > Consumed message:foo offset: 36
> > > > Consumed message:bar offset: 48
> > > > Consumed message:hello offset: 62
> > > > Consumed message:world offset: 76
> > > >
> > > > Here's a class to print the last n offsets:
> > > >
> > > > public class SimpleConsumerDemo
> > > >
> > > > {
> > > >
> > > >  public static void main(String[] args)
> > > >
> > > >  {
> > > >
> > > >    SimpleConsumer simpleConsumer = new SimpleConsumer("localhost",
> > 9092,
> > > > 1000, 1024);
> > > >
> > > >
> > > >    long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L,
> 1);
> > > >
> > > >    for (long l : offsets) {
> > > >
> > > >        System.out.println("offset: " + l);
> > > >
> > > >    }
> > > >
> > > >  }
> > > >
> > > > }
> > > >
> > > >
> > > > Running this as above, with 1 offset, yields expected results:
> > > >
> > > > ----output-----
> > > >
> > > > offset: 76
> > > >
> > > >
> > > > However, asking for 3 offsets yields unexpected results:
> > > >
> > > >
> > > > change to: long[] offsets = simpleConsumer.getOffsetsBefore("test",
> 0,
> > > -1L,
> > > > 3);
> > > >
> > > > ----output-----
> > > >
> > > > offset: 76
> > > >
> > > > offset: 0
> > > >
> > > >
> > > > I expected:
> > > >
> > > >
> > > > ----output-----
> > > >
> > > > offset: 76
> > > >
> > > > offset: 62
> > > >
> > > > offset: 48
> > > >
> > > > Any idea why I did not get what I was looking for/what I am doing
> > wrong?
> > > >
> > >
> >
>

Re: get offsets?

Posted by Neha Narkhede <ne...@gmail.com>.
>> but I also would like to have a way to go back say 1,000 messages.

Given the implementation we have today, that would be hard to implement in
the absence of an index. Each message in a kafka log segment could be of a
different size. Given that you are at some reference point in the segment
and you want to go back "n" messages, there is no way to know what offset
that might be. Even if you decide to do some kind of binary search on the
offsets, it would be hard to guess how far away you are from the reference
point.

The binary search approach would work if we kept an index per log segment,
that tracks every nth offset and the number of messages since the previous
offset key.

I'm not aware of any JIRA tracking this feature request. Could you file one
?

Thanks,
Neha

On Wed, Sep 21, 2011 at 7:06 PM, Taylor Gautier <tg...@tagged.com> wrote:

> I see (and I think I understand why it works this way).
>
> I see that kafka-87 addresses this with a request for having a time based
> index, this would be relatively useful, but I also would like to have a way
> to go back say 1,000 messages.  Other than walking backwards one segment at
> a time, can then scanning forward from there, do you have any suggestions
> how this might be done or is it also a feature request?
>
> On Wed, Sep 21, 2011 at 5:14 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > Hi Taylor,
> >
> > This is an FAQ, that was asked some time ago as well:
> >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201108.mbox/%3CCAFbh0Q3QgsYXn4kjTTh0zZ9DJZ7J14tM5g24RFJSihFfbqDNyw@mail.gmail.com%3E
> >
> > GetOffsetShell doesn't return the offset of every message. It returns the
> > offset of the first message in every segment file. If you provide a time,
> > the offsets you get back are based on the last modified time of the
> segment
> > files.
> >
> > It may be a good idea to paraphrase the above to be displayed with the
> > tool's help message.
> >
> > Thanks,
> >
> > Joel
> >
> > On Wed, Sep 21, 2011 at 4:18 PM, Taylor Gautier <tg...@tagged.com>
> > wrote:
> > > Hi,
> > >
> > > Using Kafka 0.6.
> > >
> > > I'm trying to use get offsets but it doesn't seem to work as I expect.
>  I
> > > have  a test topic that has some messages in it.  Here's the output of
> a
> > > test client that starts from offset 0 and prints all messages/offsets
> for
> > > the topic:
> > >
> > > Consumed message:foo offset: 12
> > > Consumed message:bar offset: 24
> > > Consumed message:foo offset: 36
> > > Consumed message:bar offset: 48
> > > Consumed message:hello offset: 62
> > > Consumed message:world offset: 76
> > >
> > > Here's a class to print the last n offsets:
> > >
> > > public class SimpleConsumerDemo
> > >
> > > {
> > >
> > >  public static void main(String[] args)
> > >
> > >  {
> > >
> > >    SimpleConsumer simpleConsumer = new SimpleConsumer("localhost",
> 9092,
> > > 1000, 1024);
> > >
> > >
> > >    long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L, 1);
> > >
> > >    for (long l : offsets) {
> > >
> > >        System.out.println("offset: " + l);
> > >
> > >    }
> > >
> > >  }
> > >
> > > }
> > >
> > >
> > > Running this as above, with 1 offset, yields expected results:
> > >
> > > ----output-----
> > >
> > > offset: 76
> > >
> > >
> > > However, asking for 3 offsets yields unexpected results:
> > >
> > >
> > > change to: long[] offsets = simpleConsumer.getOffsetsBefore("test", 0,
> > -1L,
> > > 3);
> > >
> > > ----output-----
> > >
> > > offset: 76
> > >
> > > offset: 0
> > >
> > >
> > > I expected:
> > >
> > >
> > > ----output-----
> > >
> > > offset: 76
> > >
> > > offset: 62
> > >
> > > offset: 48
> > >
> > > Any idea why I did not get what I was looking for/what I am doing
> wrong?
> > >
> >
>

Re: get offsets?

Posted by Taylor Gautier <tg...@tagged.com>.
I see (and I think I understand why it works this way).

I see that kafka-87 addresses this with a request for having a time based
index, this would be relatively useful, but I also would like to have a way
to go back say 1,000 messages.  Other than walking backwards one segment at
a time, can then scanning forward from there, do you have any suggestions
how this might be done or is it also a feature request?

On Wed, Sep 21, 2011 at 5:14 PM, Joel Koshy <jj...@gmail.com> wrote:

> Hi Taylor,
>
> This is an FAQ, that was asked some time ago as well:
>
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201108.mbox/%3CCAFbh0Q3QgsYXn4kjTTh0zZ9DJZ7J14tM5g24RFJSihFfbqDNyw@mail.gmail.com%3E
>
> GetOffsetShell doesn't return the offset of every message. It returns the
> offset of the first message in every segment file. If you provide a time,
> the offsets you get back are based on the last modified time of the segment
> files.
>
> It may be a good idea to paraphrase the above to be displayed with the
> tool's help message.
>
> Thanks,
>
> Joel
>
> On Wed, Sep 21, 2011 at 4:18 PM, Taylor Gautier <tg...@tagged.com>
> wrote:
> > Hi,
> >
> > Using Kafka 0.6.
> >
> > I'm trying to use get offsets but it doesn't seem to work as I expect.  I
> > have  a test topic that has some messages in it.  Here's the output of a
> > test client that starts from offset 0 and prints all messages/offsets for
> > the topic:
> >
> > Consumed message:foo offset: 12
> > Consumed message:bar offset: 24
> > Consumed message:foo offset: 36
> > Consumed message:bar offset: 48
> > Consumed message:hello offset: 62
> > Consumed message:world offset: 76
> >
> > Here's a class to print the last n offsets:
> >
> > public class SimpleConsumerDemo
> >
> > {
> >
> >  public static void main(String[] args)
> >
> >  {
> >
> >    SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 9092,
> > 1000, 1024);
> >
> >
> >    long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L, 1);
> >
> >    for (long l : offsets) {
> >
> >        System.out.println("offset: " + l);
> >
> >    }
> >
> >  }
> >
> > }
> >
> >
> > Running this as above, with 1 offset, yields expected results:
> >
> > ----output-----
> >
> > offset: 76
> >
> >
> > However, asking for 3 offsets yields unexpected results:
> >
> >
> > change to: long[] offsets = simpleConsumer.getOffsetsBefore("test", 0,
> -1L,
> > 3);
> >
> > ----output-----
> >
> > offset: 76
> >
> > offset: 0
> >
> >
> > I expected:
> >
> >
> > ----output-----
> >
> > offset: 76
> >
> > offset: 62
> >
> > offset: 48
> >
> > Any idea why I did not get what I was looking for/what I am doing wrong?
> >
>

Re: get offsets?

Posted by Joel Koshy <jj...@gmail.com>.
Hi Taylor,

This is an FAQ, that was asked some time ago as well:
http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201108.mbox/%3CCAFbh0Q3QgsYXn4kjTTh0zZ9DJZ7J14tM5g24RFJSihFfbqDNyw@mail.gmail.com%3E

GetOffsetShell doesn't return the offset of every message. It returns the
offset of the first message in every segment file. If you provide a time,
the offsets you get back are based on the last modified time of the segment
files.

It may be a good idea to paraphrase the above to be displayed with the
tool's help message.

Thanks,

Joel

On Wed, Sep 21, 2011 at 4:18 PM, Taylor Gautier <tg...@tagged.com> wrote:
> Hi,
>
> Using Kafka 0.6.
>
> I'm trying to use get offsets but it doesn't seem to work as I expect.  I
> have  a test topic that has some messages in it.  Here's the output of a
> test client that starts from offset 0 and prints all messages/offsets for
> the topic:
>
> Consumed message:foo offset: 12
> Consumed message:bar offset: 24
> Consumed message:foo offset: 36
> Consumed message:bar offset: 48
> Consumed message:hello offset: 62
> Consumed message:world offset: 76
>
> Here's a class to print the last n offsets:
>
> public class SimpleConsumerDemo
>
> {
>
>  public static void main(String[] args)
>
>  {
>
>    SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 9092,
> 1000, 1024);
>
>
>    long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L, 1);
>
>    for (long l : offsets) {
>
>        System.out.println("offset: " + l);
>
>    }
>
>  }
>
> }
>
>
> Running this as above, with 1 offset, yields expected results:
>
> ----output-----
>
> offset: 76
>
>
> However, asking for 3 offsets yields unexpected results:
>
>
> change to: long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L,
> 3);
>
> ----output-----
>
> offset: 76
>
> offset: 0
>
>
> I expected:
>
>
> ----output-----
>
> offset: 76
>
> offset: 62
>
> offset: 48
>
> Any idea why I did not get what I was looking for/what I am doing wrong?
>