You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jay Kreps (JIRA)" <ji...@apache.org> on 2012/09/12 01:24:07 UTC

[jira] [Created] (KAFKA-506) Store logical offset in log

Jay Kreps created KAFKA-506:
-------------------------------

             Summary: Store logical offset in log
                 Key: KAFKA-506
                 URL: https://issues.apache.org/jira/browse/KAFKA-506
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8
            Reporter: Jay Kreps
            Assignee: Jay Kreps
             Fix For: 0.8


Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.

To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.

It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.

As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).

This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.

The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.

The following changes would be part of this:
1. The log format would now be
      8 byte offset
      4 byte message_size
      N byte message
2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.

I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.

Here are a few issues to be considered for the first patch:
1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Re: [jira] [Resolved] (KAFKA-506) Store logical offset in log

Posted by Taylor Gautier <tg...@gmail.com>.
Thanks Jay,

Sounds pretty good, though it seems you will likely be trading at least one
iop for the index, unless you can store all indexes in memory, which at a
certain size may not be feasible.  I'm just guessing though....

It seems using this new methodology there is no longer a need for Tagged's
special Kafka branch, as reverse indexing (m messages ago) is now natively
supported...that's nice.

And...in my new job I expect I may want to use Kafka as a commit log in a
very similar way...so the functionality you describe sounds useful.

On Mon, Oct 8, 2012 at 1:59 PM, Jay Kreps <ja...@gmail.com> wrote:

> Hi Taylor,
>
> These are good questions.
>
> 1. It is required. It seemed cleaner to have one mechanism for mapping the
> "log change number" to the physical message and make that efficient rather
> than have lots of options.
>
> 2. Zero-copy transfer is still used. The performance impact can be
> understood in two pieces: (1) finding the chunk of log you need, (2)
> transferring that chunk. This only impacts (1). There is some performance
> impact for (1), but I think it can be made fast enough that it will not be
> significant. My next task is to do some performance work on the log system
> and I will quantify the impact more then.
>
> 3. This is a good question. First you asked if this is a special case of
> general indexing. It is. But because indexing monotonically increasing
> fixed width numerical offsets can be done much more efficiently it is an
> important special case. The idea of creating more general indexes seems
> appealing at first, but when you think about it more you realize you need a
> query language, data model, and many other things for this to be useful and
> essentially you are implementing a database which isn't what Kafka is meant
> to be.
>
> Here is what this adds right off the bat with the patch I checked in:
> a. It is esthetically nice. The first message will have offset 0, the
> second message 1, the 100th message offset 100, etc.
> b. You can read the messages in reverse if you like. If the end of the log
> is 9876 then 100 messages before that is 9776.
> c. It is less error prone: There are no invalid offsets and no byte
> arithmetic.
> d. It fixes the commit() functionality with respect to compressed messages.
> Previously there was effectively no offset for messages inside of a
> compressed set, so one could only commit ones position at compressed
> message set boundaries. This made the semantics of compressed messages very
> problematic.
>
> One of the primary motivators is not the above items, but rather the
> ability to allow more sophisticated retention policies. Some systems at
> LinkedIn use Kafka as a kind of commit log. That is they take a stream of
> changes from Kafka, process them, and apply some munged version of this to
> a local search index, key-value store, or other data structure for serving.
> Virtually all of these systems have some notion of a primary key for
> records. The general problem these systems have to solve is what they need
> to do if they need to recreate their local state (say if a node fails, or
> they need to reprocess data in a different way). Since Kafka only will
> contain a fixed range of data, they can't really re-process from Kafka
> unless the data they serve is also time-based as we will have cleaned out
> old messages. But you could imagine a slightly different retention strategy
> in Kafka that allowed you to retain messages by some primary key. So rather
> than throwing away old segments you would have the option to "clean" old
> segments and just retain the latest record for each primary key. That would
> allow using the kafka log for all restore functionality and still guarantee
> that you restored the latest value for each key. This retention strategy
> would only make sense to use for topics that contain data with a primary
> key, so it would be optional. I think this is actually very powerful when
> combined with replication because it is a way to get a highly available
> "commit" or "restore" log.
>
> -Jay
>
>
> On Mon, Oct 8, 2012 at 12:40 PM, Taylor Gautier <tg...@gmail.com>
> wrote:
>
> > I seem to be late to the party on this one - can you summarize what
> > happened to the log files - and wire protocol - as a result of this
> change?
> >
> > I have some questions regarding what implications this change has:
> >
> > 1) is it required or optional, iotw can the physical offset mechanism
> still
> > be used?
> > 2) if required, is it going to change the ability of kafka for
> > ultra-efficient i/o (currently, near zero memory is used, sendfile keeps
> > things fast and tidy)
> > 3) What additional capabilities does this give - for example can one do
> > negative indexing and positive indexing from a given offset?
> >
> > I am wondering, is this a specific instance of a more general class of
> > indexing?
> >
> > By negative indexing, I mean for example at Tagged we wanted the
> capability
> > to retrieve the message at m messages ago from the top of the message
> > queue.
> >
> > To implement this, we patched kafka itself to create a shadow kafka topic
> > that for every message received in topic A, would write the offset of
> said
> > message from topic A into the shadow topic A'.  Since offsets are fixed
> > width, the topic A' is effectively an index and to find m messages ago is
> > simple math -->  read from offset n - m*(fixed message size) where n is
> the
> > current offset of A' and m is num messages ago which will result in an
> > offset in topic A..
> >
> > Can this new ability you have implemented provide that kind of
> > functionality?
> >
> > On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <ji...@apache.org>
> wrote:
> >
> > >
> > >      [
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> > ]
> > >
> > > Jay Kreps resolved KAFKA-506.
> > > -----------------------------
> > >
> > >     Resolution: Fixed
> > >
> > > Committed.
> > >
> > > > Store logical offset in log
> > > > ---------------------------
> > > >
> > > >                 Key: KAFKA-506
> > > >                 URL: https://issues.apache.org/jira/browse/KAFKA-506
> > > >             Project: Kafka
> > > >          Issue Type: Bug
> > > >    Affects Versions: 0.8
> > > >            Reporter: Jay Kreps
> > > >            Assignee: Jay Kreps
> > > >             Fix For: 0.8
> > > >
> > > >         Attachments: KAFKA-506-phase-2.patch,
> > > KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch,
> > > KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch,
> > > KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch,
> > > KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch,
> > > KAFKA-506-v4-changes-since-v3.patch
> > > >
> > > >
> > > > Currently we only support retention by dropping entire segment
> files. A
> > > more nuanced retention policy would allow dropping individual messages
> > from
> > > a segment file by recopying it. This is not currently possible because
> > the
> > > lookup structure we use to locate messages is based on the file offset
> > > directly.
> > > > To fix this we should move to a sequential, logical offset
> > (0,1,2,3,...)
> > > which would allow deleting individual messages (e.g. 2) without
> deleting
> > > the entire segment.
> > > > It is desirable to make this change in the 0.8 timeframe since we are
> > > already doing data format changes.
> > > > As part of this we would explicitly store the key field given by the
> > > producer for partitioning (right now there is no way for the consumer
> to
> > > find the value used for partitioning).
> > > > This combination of features would allow a key-based retention policy
> > > that would clean obsolete values either by a user defined key.
> > > > The specific use case I am targeting is a commit log for local state
> > > maintained by a process doing some kind of near-real-time processing.
> The
> > > process could log out its local state changes and be able to restore
> from
> > > this log in the event of a failure. However I think this is a broadly
> > > useful feature.
> > > > The following changes would be part of this:
> > > > 1. The log format would now be
> > > >       8 byte offset
> > > >       4 byte message_size
> > > >       N byte message
> > > > 2. The offsets would be changed to a sequential, logical number
> rather
> > > than the byte offset (e.g. 0,1,2,3,...)
> > > > 3. A local memory-mapped lookup structure will be kept for each log
> > > segment that contains the mapping from logical to physical offset.
> > > > I propose to break this into two patches. The first makes the log
> > format
> > > changes, but retains the physical offset. The second adds the lookup
> > > structure and moves to logical offset.
> > > > Here are a few issues to be considered for the first patch:
> > > > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One
> > > surprising thing is that the offset is actually the offset of the next
> > > message. I think there are actually several uses for the current
> offset.
> > I
> > > would propose making this hold the current message offset since with
> > > logical offsets the next offset is always just current_offset+1. Note
> > that
> > > since we no longer require messages to be dense, it is not true that if
> > the
> > > next offset is N the current offset is N-1 (because N-1 may have been
> > > deleted). Thoughts or objections?
> > > > 2. Currently during iteration over a ByteBufferMessageSet we throw an
> > > exception if there are zero messages in the set. This is used to detect
> > > fetches that are smaller than a single message size. I think this
> > behavior
> > > is misplaced and should be moved up into the consumer.
> > > > 3. In addition to adding a key in Message, I made two other changes:
> > (1)
> > > I moved the CRC to the first field and made it cover the entire message
> > > contents (previously it only covered the payload), (2) I dropped
> support
> > > for Magic=0, effectively making the attributes field required, which
> > > simplifies the code (since we are breaking compatibility anyway).
> > >
> > > --
> > > This message is automatically generated by JIRA.
> > > If you think it was sent incorrectly, please contact your JIRA
> > > administrators
> > > For more information on JIRA, see:
> > http://www.atlassian.com/software/jira
> > >
> >
>

Re: [jira] [Resolved] (KAFKA-506) Store logical offset in log

Posted by Jay Kreps <ja...@gmail.com>.
Hi Taylor,

These are good questions.

1. It is required. It seemed cleaner to have one mechanism for mapping the
"log change number" to the physical message and make that efficient rather
than have lots of options.

2. Zero-copy transfer is still used. The performance impact can be
understood in two pieces: (1) finding the chunk of log you need, (2)
transferring that chunk. This only impacts (1). There is some performance
impact for (1), but I think it can be made fast enough that it will not be
significant. My next task is to do some performance work on the log system
and I will quantify the impact more then.

3. This is a good question. First you asked if this is a special case of
general indexing. It is. But because indexing monotonically increasing
fixed width numerical offsets can be done much more efficiently it is an
important special case. The idea of creating more general indexes seems
appealing at first, but when you think about it more you realize you need a
query language, data model, and many other things for this to be useful and
essentially you are implementing a database which isn't what Kafka is meant
to be.

Here is what this adds right off the bat with the patch I checked in:
a. It is esthetically nice. The first message will have offset 0, the
second message 1, the 100th message offset 100, etc.
b. You can read the messages in reverse if you like. If the end of the log
is 9876 then 100 messages before that is 9776.
c. It is less error prone: There are no invalid offsets and no byte
arithmetic.
d. It fixes the commit() functionality with respect to compressed messages.
Previously there was effectively no offset for messages inside of a
compressed set, so one could only commit ones position at compressed
message set boundaries. This made the semantics of compressed messages very
problematic.

One of the primary motivators is not the above items, but rather the
ability to allow more sophisticated retention policies. Some systems at
LinkedIn use Kafka as a kind of commit log. That is they take a stream of
changes from Kafka, process them, and apply some munged version of this to
a local search index, key-value store, or other data structure for serving.
Virtually all of these systems have some notion of a primary key for
records. The general problem these systems have to solve is what they need
to do if they need to recreate their local state (say if a node fails, or
they need to reprocess data in a different way). Since Kafka only will
contain a fixed range of data, they can't really re-process from Kafka
unless the data they serve is also time-based as we will have cleaned out
old messages. But you could imagine a slightly different retention strategy
in Kafka that allowed you to retain messages by some primary key. So rather
than throwing away old segments you would have the option to "clean" old
segments and just retain the latest record for each primary key. That would
allow using the kafka log for all restore functionality and still guarantee
that you restored the latest value for each key. This retention strategy
would only make sense to use for topics that contain data with a primary
key, so it would be optional. I think this is actually very powerful when
combined with replication because it is a way to get a highly available
"commit" or "restore" log.

-Jay


On Mon, Oct 8, 2012 at 12:40 PM, Taylor Gautier <tg...@gmail.com> wrote:

> I seem to be late to the party on this one - can you summarize what
> happened to the log files - and wire protocol - as a result of this change?
>
> I have some questions regarding what implications this change has:
>
> 1) is it required or optional, iotw can the physical offset mechanism still
> be used?
> 2) if required, is it going to change the ability of kafka for
> ultra-efficient i/o (currently, near zero memory is used, sendfile keeps
> things fast and tidy)
> 3) What additional capabilities does this give - for example can one do
> negative indexing and positive indexing from a given offset?
>
> I am wondering, is this a specific instance of a more general class of
> indexing?
>
> By negative indexing, I mean for example at Tagged we wanted the capability
> to retrieve the message at m messages ago from the top of the message
> queue.
>
> To implement this, we patched kafka itself to create a shadow kafka topic
> that for every message received in topic A, would write the offset of said
> message from topic A into the shadow topic A'.  Since offsets are fixed
> width, the topic A' is effectively an index and to find m messages ago is
> simple math -->  read from offset n - m*(fixed message size) where n is the
> current offset of A' and m is num messages ago which will result in an
> offset in topic A..
>
> Can this new ability you have implemented provide that kind of
> functionality?
>
> On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <ji...@apache.org> wrote:
>
> >
> >      [
> >
> https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> ]
> >
> > Jay Kreps resolved KAFKA-506.
> > -----------------------------
> >
> >     Resolution: Fixed
> >
> > Committed.
> >
> > > Store logical offset in log
> > > ---------------------------
> > >
> > >                 Key: KAFKA-506
> > >                 URL: https://issues.apache.org/jira/browse/KAFKA-506
> > >             Project: Kafka
> > >          Issue Type: Bug
> > >    Affects Versions: 0.8
> > >            Reporter: Jay Kreps
> > >            Assignee: Jay Kreps
> > >             Fix For: 0.8
> > >
> > >         Attachments: KAFKA-506-phase-2.patch,
> > KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch,
> > KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch,
> > KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch,
> > KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch,
> > KAFKA-506-v4-changes-since-v3.patch
> > >
> > >
> > > Currently we only support retention by dropping entire segment files. A
> > more nuanced retention policy would allow dropping individual messages
> from
> > a segment file by recopying it. This is not currently possible because
> the
> > lookup structure we use to locate messages is based on the file offset
> > directly.
> > > To fix this we should move to a sequential, logical offset
> (0,1,2,3,...)
> > which would allow deleting individual messages (e.g. 2) without deleting
> > the entire segment.
> > > It is desirable to make this change in the 0.8 timeframe since we are
> > already doing data format changes.
> > > As part of this we would explicitly store the key field given by the
> > producer for partitioning (right now there is no way for the consumer to
> > find the value used for partitioning).
> > > This combination of features would allow a key-based retention policy
> > that would clean obsolete values either by a user defined key.
> > > The specific use case I am targeting is a commit log for local state
> > maintained by a process doing some kind of near-real-time processing. The
> > process could log out its local state changes and be able to restore from
> > this log in the event of a failure. However I think this is a broadly
> > useful feature.
> > > The following changes would be part of this:
> > > 1. The log format would now be
> > >       8 byte offset
> > >       4 byte message_size
> > >       N byte message
> > > 2. The offsets would be changed to a sequential, logical number rather
> > than the byte offset (e.g. 0,1,2,3,...)
> > > 3. A local memory-mapped lookup structure will be kept for each log
> > segment that contains the mapping from logical to physical offset.
> > > I propose to break this into two patches. The first makes the log
> format
> > changes, but retains the physical offset. The second adds the lookup
> > structure and moves to logical offset.
> > > Here are a few issues to be considered for the first patch:
> > > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One
> > surprising thing is that the offset is actually the offset of the next
> > message. I think there are actually several uses for the current offset.
> I
> > would propose making this hold the current message offset since with
> > logical offsets the next offset is always just current_offset+1. Note
> that
> > since we no longer require messages to be dense, it is not true that if
> the
> > next offset is N the current offset is N-1 (because N-1 may have been
> > deleted). Thoughts or objections?
> > > 2. Currently during iteration over a ByteBufferMessageSet we throw an
> > exception if there are zero messages in the set. This is used to detect
> > fetches that are smaller than a single message size. I think this
> behavior
> > is misplaced and should be moved up into the consumer.
> > > 3. In addition to adding a key in Message, I made two other changes:
> (1)
> > I moved the CRC to the first field and made it cover the entire message
> > contents (previously it only covered the payload), (2) I dropped support
> > for Magic=0, effectively making the attributes field required, which
> > simplifies the code (since we are breaking compatibility anyway).
> >
> > --
> > This message is automatically generated by JIRA.
> > If you think it was sent incorrectly, please contact your JIRA
> > administrators
> > For more information on JIRA, see:
> http://www.atlassian.com/software/jira
> >
>

Re: [jira] [Resolved] (KAFKA-506) Store logical offset in log

Posted by Taylor Gautier <tg...@gmail.com>.
I seem to be late to the party on this one - can you summarize what
happened to the log files - and wire protocol - as a result of this change?

I have some questions regarding what implications this change has:

1) is it required or optional, iotw can the physical offset mechanism still
be used?
2) if required, is it going to change the ability of kafka for
ultra-efficient i/o (currently, near zero memory is used, sendfile keeps
things fast and tidy)
3) What additional capabilities does this give - for example can one do
negative indexing and positive indexing from a given offset?

I am wondering, is this a specific instance of a more general class of
indexing?

By negative indexing, I mean for example at Tagged we wanted the capability
to retrieve the message at m messages ago from the top of the message queue.

To implement this, we patched kafka itself to create a shadow kafka topic
that for every message received in topic A, would write the offset of said
message from topic A into the shadow topic A'.  Since offsets are fixed
width, the topic A' is effectively an index and to find m messages ago is
simple math -->  read from offset n - m*(fixed message size) where n is the
current offset of A' and m is num messages ago which will result in an
offset in topic A..

Can this new ability you have implemented provide that kind of
functionality?

On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <ji...@apache.org> wrote:

>
>      [
> https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel]
>
> Jay Kreps resolved KAFKA-506.
> -----------------------------
>
>     Resolution: Fixed
>
> Committed.
>
> > Store logical offset in log
> > ---------------------------
> >
> >                 Key: KAFKA-506
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-506
> >             Project: Kafka
> >          Issue Type: Bug
> >    Affects Versions: 0.8
> >            Reporter: Jay Kreps
> >            Assignee: Jay Kreps
> >             Fix For: 0.8
> >
> >         Attachments: KAFKA-506-phase-2.patch,
> KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch,
> KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch,
> KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch,
> KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch,
> KAFKA-506-v4-changes-since-v3.patch
> >
> >
> > Currently we only support retention by dropping entire segment files. A
> more nuanced retention policy would allow dropping individual messages from
> a segment file by recopying it. This is not currently possible because the
> lookup structure we use to locate messages is based on the file offset
> directly.
> > To fix this we should move to a sequential, logical offset (0,1,2,3,...)
> which would allow deleting individual messages (e.g. 2) without deleting
> the entire segment.
> > It is desirable to make this change in the 0.8 timeframe since we are
> already doing data format changes.
> > As part of this we would explicitly store the key field given by the
> producer for partitioning (right now there is no way for the consumer to
> find the value used for partitioning).
> > This combination of features would allow a key-based retention policy
> that would clean obsolete values either by a user defined key.
> > The specific use case I am targeting is a commit log for local state
> maintained by a process doing some kind of near-real-time processing. The
> process could log out its local state changes and be able to restore from
> this log in the event of a failure. However I think this is a broadly
> useful feature.
> > The following changes would be part of this:
> > 1. The log format would now be
> >       8 byte offset
> >       4 byte message_size
> >       N byte message
> > 2. The offsets would be changed to a sequential, logical number rather
> than the byte offset (e.g. 0,1,2,3,...)
> > 3. A local memory-mapped lookup structure will be kept for each log
> segment that contains the mapping from logical to physical offset.
> > I propose to break this into two patches. The first makes the log format
> changes, but retains the physical offset. The second adds the lookup
> structure and moves to logical offset.
> > Here are a few issues to be considered for the first patch:
> > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One
> surprising thing is that the offset is actually the offset of the next
> message. I think there are actually several uses for the current offset. I
> would propose making this hold the current message offset since with
> logical offsets the next offset is always just current_offset+1. Note that
> since we no longer require messages to be dense, it is not true that if the
> next offset is N the current offset is N-1 (because N-1 may have been
> deleted). Thoughts or objections?
> > 2. Currently during iteration over a ByteBufferMessageSet we throw an
> exception if there are zero messages in the set. This is used to detect
> fetches that are smaller than a single message size. I think this behavior
> is misplaced and should be moved up into the consumer.
> > 3. In addition to adding a key in Message, I made two other changes: (1)
> I moved the CRC to the first field and made it cover the entire message
> contents (previously it only covered the payload), (2) I dropped support
> for Magic=0, effectively making the attributes field required, which
> simplifies the code (since we are breaking compatibility anyway).
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA
> administrators
> For more information on JIRA, see: http://www.atlassian.com/software/jira
>

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468350#comment-13468350 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v3. We are almost there. A few more comments:

40. Log.append: It seems that it's easier if lastOffset returned is just nextOffset instead of nextOffset -1. Then, in KafkaApis, we can just pass end, instead of end+1 to ProducerResponseStatus.

41. OffsetIndex: When initializing mmap, if the index is mutable, shouldn't we move the position to the end of the buffer for append operations?

42. KafkaApis: It's useful to pass in brokerId to RequestPurgatory for debugging unit tests.

43. DumpLogSegments: Currently, the message iterator in FileMessageSet will stop when it hits the first non parsable message. So, we need to check if at the end of the message iteration, location == FileMessageSet.sizeInBytes(). If not, we should report the offset from which data is corrupted.

44. ConsumerIterator: The check for guarding small fetch size doesn't work. This is because in PartitionTopicInfo.enqueue(), we only add ByteBufferMessageSet that has positive valid bytes. We can log an error in PartitionTopicInfo.enqueue() and enqueue a special instance of FetchedDataChunk that indicates an error. In ConsumerIterator, when seeing the special FetchedDataChunk, it can throw an exception.

29. Yes, all parameters in the constructor in a case class are implicitly val.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v2.patch
    
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457338#comment-13457338 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Actually, there is another thing.

11. We need to change DefaultEventHandler to put the key data into messages sent to the broker. Also, Producer currently can take any type as key, do we want to restrict it to bytes or do we want to define a serializer for key too?
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469655#comment-13469655 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v5. 

50. There is still a potential issue in that shallowValidByteCount is a long and long value is not guaranteed to be exposed atomically without synchronization in java. So, 1 thread could see a partially updated long value. Thinking about this, since ByteBufferMessageSet is not updatable, is it better to compute validBytes once in the constructor?

51. ConsumerIterator: Could you include currentDataChunk.fetchOffset in the message string in MessageSizeTooLargeException? This will make debugging easier.

Since this is a large patch, it would be good if someone else takes a closer look at it too. At least Neha expressed interests in taking another look at the latest patch.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13465976#comment-13465976 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

2 additions to the preliminary comments -
- 3 unit tests fail on patch v2 - http://pastebin.com/ECUA2n1f
- It will be nice for maxIndexEntries to be a configurable property on the server
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13473821#comment-13473821 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

+1. Looks good and thanks for addressing the late review comments. One minor comment -

The following error statement is slightly misleading. The broker could either be in the middle of becoming a leader or a follower, not necessarily the former. 

fatal("Disk error while becoming leader.")
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-neha-post-review-v2.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13466626#comment-13466626 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v2. Some more comments:

20. Log:
20.1 findRange(): Add to the comment that now this method returns the largest segment file <= the requested offset.
20.2 close(): move the closing } for the for loop to a new line.
20.3 bytesSinceLastIndexEntry is only set but is never read.
20.4 append(): This method returns the offset of the first message to be appended. This is ok for the purpose of returning the offset to the producer. However, when determining whether all replicas have received the appended messages, we need to use the log end offset after the messages are appended. So, what we should do is to have append() return 2 offsets, one before the append and one after the append. We use the former in producer response and use the latter for the replica check. To avoid complicating this patch further, another approach is to, in the jira, have append return the log end offset after the append and use it in both producer response and replica check. We can file a separate jira to have append return 2 offsets.
20.5 read(): The trace statement: last format pattern should be %d instead of %s.
20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. It should be the offset of the next segment.
          segments.view.find(segment => targetOffset >= segment.start && targetOffset < logEndOffset)
20.7 There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?

21. LogSegment: bytesSinceLastIndexEntry needs to be updated in append().

22. FileMessageSet.searchFor(): The following check seems to be a bit strange. Shouldn't we use position + 12 or just position instead?
    while(position + 8 < size) {

23. OffsetIndex:
23.1 In the comment, "mutable index can be created to" seems to have a grammar bug.
23.2 mmap initialization: The following statement seems unnecessary. However, we do need to set the mapped buffer's position to end of file for mutable indexes. 
          idx.position(idx.limit).asInstanceOf[MappedByteBuffer]
23.3 append(): If index entry is full, should we automatically roll the log segment? It's ok if this is tracked in a separate jira.
23.4 makeReadOnly(): should we call flush after raf.setLength()? Also, should we remap the index file to the current length and make it read only?

24. LogManager.shutdown(): log indentation already adds LogManager in the prefix of each log entry.

25. KafkaApis:
25.1 handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?
25.2 ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests.

26. Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case.

27. javaapi.ByteBufferMessageSet: underlying should be private val.

28. DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now.

29. FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now.

30. PartitionData:
30.1 No need to redefine equals and hashcode since this is already a case class.
30.2 initialOffset is no longer needed.

31. PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator since the offset of a compressed message is always the offset of the last internal message.

32. ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression.

33. remove unused imports.

The following comment from the first round of review is still not addressed.
10. How do we handle the case that a consumer uses too small a fetch size?

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13471631#comment-13471631 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

Btw, which svn revision does patch v5 apply correctly on ? 
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469726#comment-13469726 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

50. It can actually only take Int values, so I don't think this can happen. I will file a follow-up clean-up issue to change sizeInBytes to be an Int (I had mentioned that earlier in the thread) since this anyways leads to innumerable safe-but-annoying casts to int. I think this is better than pre-computing it because in many cases we instantiate a ByteBufferMessageSet without necessarily using validBytes.

51. Yes, I will add this as part of the checkin.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v5.patch

Rebased patch and improved error message for the MessageSizeTooLargeException.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13474661#comment-13474661 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Ack, nice catch, fixed it.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-neha-post-review-v2.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2.patch

This patch is incremental from the previous one. I will rebase and provide an up-to-date patch that covers both phases, but this shows the new work required to support logical offsets.

I think I have addressed most of the comments on the original patch, except:
1. I have put off any performance optimization (avoiding recompression for replicas, memory-mapping the log, etc). I would like to break this into a separate JIRA and write a reasonable standalone Log benchmark that covers these cases and then work against that. I have several other cleanups I would like to do as well: (1) get rid of SegmentList, (2) move more functionality in Log into LogSegment.
2. I am not yet storing the key in the message, this may change the produce api slightly so i think this should be a seperate JIRA too.
3. Neha--I change most of the uses of magical numbers except where the concrete number is more clear.

Here is a description of the new changes.
- Offset now always refers to a logical log offset. I have tried to change any instances where offset meant file offset to instead use the terminology "position". References to file positions should only occur in Log.scala and classes internal to that.
- As in the previous patch MessageAndOffset gives three things: (1) the message, (2) the offset of THAT message, and (3) a helper method to calculate the next offset.
- Log.append() is responsible for maintaining the logEndOffset and using it to assign offsets to the messageset before appending to the log.
- Offsets are now assigned to compressed messages too. One nuance is that the offset of the wrapper message is equal to the last offset of the messages it contains. This will be more clear in the discussion of the offset search changes.
- Log.read now accepts a new argument maxOffset, which is the largest (logical) offset that will be returned in addition to the maxSize which limits the size in bytes.
- I have changed Log.read to now support sparse offsets. That is, it is valid to have missing offsets. This sparseness is needed both for the key-retention but also for the correct handling of compressed messages. I will describe the read path in more detail below.
- I moved FileMessageSet to the package kafka.log as already much of its functionality was specific to the log implementation.
- I changed FetchPurgatory back to use a simple counter for accumulated bytes. It was previously re-calculating the available bytes, but because this now is a more expensive operation, and because this calculation is redone for each topic/partition produce (i.e. potentially 200 times per produce request), I think this is better. This is less accurate, but since long poll is a heuristic anyway I think that is okay.
- I changed the default suffix of .kafka files to .log and added a new .index file that contains a sparse index of offset=>file_position to help efficiently resolve logical offsets.
- Entries are added to this index at a configurable frequency, controlled by a new configuration log.index.interval.bytes which defaults to 4096
- I removed numerous instances of byte calculations. I think this is a good thing for code quality.

Here is a description of the new read path.
1. First log tries to find the correct segment to read from using the existing binary search on log segments. I modified this search slightly in two ways. First we had a corner case bug which only occurred if you have two files with successive offsets (unlikely now, impossible before). Second, I now no longer check ranges but instead return the largest segment file less than or equal to the requested offset.
2. Once the segment is found we check the index on that segment. The index returns the largest offset less than or equal to the requested offset and the associated file position in the log file. This position represents a least upper bound on the position in the file, and it is the position from which we begin a linear search checking each message. The index itself is just a sorted sequence of (offset, position) pairs. Complete details are in the header comments on kafka.log.OffsetIndex.scala. It is not required that all messages have an entry in the OffsetIndex, instead there is a confgurable frequency in terms of bytes which is set in LogSegment. So, for example, we might have an entry every 4096 bytes. This frequency is approximate, since a single message may be larger than that.
3. Once we have a greatest lower bound on the location we use FileMessageSet.searchFor to search for the position of the first message with an offset at least as large as the target offset. This search just skips through the file checking the offset only.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13465936#comment-13465936 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Three preliminary comments from Neha while she does deeper interogations:
- Would be nice if the DumpLogSegment tool also dumped the contents of the index file
- This patch implicitly assumes file segments are limited to 2GB (I use a 4 byte position pointer in the index). Turns out this isn't true. Proposed fix is to limit log segments to 2GB.
- We decided the corner case with sparse messages at the end of a segment isn't really a corner case as it effects compressed messages too. So I will fix that in the scope of this patch.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456207#comment-13456207 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

I am going to begin phase two of this, implementing the logical offset management in Log.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Sam Meder (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13474652#comment-13474652 ] 

Sam Meder commented on KAFKA-506:
---------------------------------

I think you missed a change to KafkaETLContext. It needs:

diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index bca1757..9498169 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -205,7 +205,7 @@ public class KafkaETLContext {
             
             key.set(_index, _offset, messageAndOffset.message().checksum());
             
-            _offset = messageAndOffset.offset();  //increase offset
+            _offset = messageAndOffset.nextOffset();  //increase offset
             _count ++;  //increase count
             
             return true;

or something similar. As it stands it'll run forever...

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-neha-post-review-v2.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472753#comment-13472753 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely :) Since you know the code better, feel free to file follow up JIRAs

1. Log

1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search -
      val mid = ceil((high + low) / 2.0).toInt
Will probably be better to use
     val mid = low + ceil((high - low)/2.0).toInt
1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls.
We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length.
1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory.
1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me.

2. LogManager
2.1 numPartitions is an unused class variable

3. FileMessageSet
3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset
3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent.
3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead.

4. LogSegment
4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment.
4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 

5. ConsumerIterator

Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset.

6. ReplicaFetcherThread

When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter

7. MessageCompressionTest
There are 2 unused imports in this file

8. ByteBufferMessageSet
8.1 There are 3 unused imports in this file
8.2 The return statement in create() API is redundant

9. OffsetIndex
9.1 The last return statement in indexSlotFor is redundant
9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block.

10. Performance
Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 -
10.1 Recompression of data during replica reads
10.2 Recompression of data to assign correct offsets inside a compressed message set
10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval.
10.4 The impact of making the log memory mapped.
10.5 Overhead of using the index to read/write data in Kafka

Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here -

Follow up JIRAs
1. Retain key in producer (KAFKA-544)
2. Change sizeInBytes() to Int (KAFKA-556)
3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546)
4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557)
5. Rebuild missing index files (JIRA to be filed)

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469780#comment-13469780 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

I will free up tomorrow after Grace Hopper conference is over. Would like to take another closer look at the follow up patches. If you guys don't mind, please can we hold this at least for this weekend ?
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v5.patch

Rebased again and fixed the above issues to make v5

50. I looked into this. It is slightly subtle. The problem was that validBytes is cached in a local variable, and the incremental computation was done on the member variable in ByteBufferMessageSet. The next problem was that AbstractFetcherThread and the ConsumerIterator could both be calling this at the same time, which would lead to setting validBytes to 0 and then iterating over the messages to count the bytes. If the check and the computation occurred at precisely the same time it is possible for validBytes to return essentially any value. The fix is (1) avoid mucking with the MessageSet once it is handed over to ConsumerFetcherThread.processPartitionData, and (2) use a local variable to compute the validbytes, this way even if we do have future threading bugs the worst case is that we recompute the same cached value twice instead of accessing a partial computation (we could also make the variable volatile, but that doesn't really add any additional protection since we don't need precise memory visibility).

51. Done.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13474269#comment-13474269 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Ah, nice catch. Changed it to "Disk error during leadership change."

Checked in with the change.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-neha-post-review-v2.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v1.patch

Okay attached a fully rebased patch that contains both phase 1 and phase 2 changes.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13459975#comment-13459975 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

Thanks for the patch ! The log format change doesn't interfere with replication as of this patch. A few comments in addition to Jun's -

1. CompressionUtils: How about re-using the ByteBufferMessageSet.writeMessage() API for serializing the compressed message to a byte buffer ?

2. ByteBufferMessageSet.scala, FileMessageSet: Can we use MessageSet.LogOverhead instead of 12 for byte arithmetic ?

3. ConsumerIterator
The nextOffset issue for compressed message sets will get resolved when we actually use the sequential logical offsets. With that, the advantage is that the consumer will be able to fetch a message even if it is inside a compressed message. Today, there is no good way to achieve this unless we have level-2 message offsets for compressed messages. Even if we cannot make that change in time for replication, we can take this change and leave the message set iterator to return the next offset (valid fetch offset), just like we do today. So, either way, we are covered here.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v3.patch

New patch with a few new things:

I rebased a few more times to pick up changes.

WRT Neha's comments:
- I made maxIndexEntries configurable by adding the property log.index.max.size. I did this in terms of index file size rather than entries since the user doesn't really know the entry size but may care about the file size.
- For the failing tests: (1) The message set failure is due to scalatest not handling parameterized tests, i had fixed this but somehow it didn't make it into the previous patch. It is in the current one. testHWCheckpointWithFailuresSingleLogSegment is a timing assumption in that test. Fixed it by adding a sleep :-(. The producer test failure I cannot reproduce.
- Wrote a test case using compressed messages to try to produce the corner case at the end of a segment. But actually this turns out not to be possible with compressed messages since the numbering is by the last offset. So effectively our segments are always dense right now. As such I would rather wait until I refactor segment list to fix it since it will be duplicate work otherwise.
- Turns out that log segments are limited to 2GB already, via a restriction in the config. Not actually sure why this is. Given this limitation one cleanup that might be nice to do would be to convert MessageSet.sizeInBytes to an Int, which would remove a lot of casts. Since this is an unrelated cleanup I will not do it in this patch.
- I added support to DumpLogSegment tool to display the index file. I had to revert Jun's change to check that last offset=file size since this is no longer true.

Jun's Comments:
First of all, this is an impressively thorough code review. Thanks!
20.1 Made the Log.findRange comment more reflective of what the method does. I hope to remove this entirely in the next phase.
20.2 Fixed mangled paren in close()
20.3 bytesSinceLastIndexEntry. Yes, good catch. This is screwed up. This was moved into LogSegment, but the read and update are split in two places. Fixed.
20.4 append(): "We need to have both the begin offset and the end offset returned by Log.append()". Made Log.append return (Long, Long). I am not wild about this change, but I see the need. I had to refactor KafkaApis slightly since we were constructing an intermediate response object in the produceToLocalLog method (which was kind of weird anyway) so there was only one offset and since this is an API object we can't change it. I think the use of API objects in the business logic is a bit dangerous for this reason.
20.5 Fixed broken log statement to use correct format param.
20.6 truncateTo(): The usage of logEndOffset in the following statement is incorrect. Changed this to use Log.findInRange which I think is the intention.
20.7 "There are several places where we need to create a log segment and the code for creating the new data file and the new index file is duplicate. Could we create a utility function createNewSegment to share the code?" Good idea, done. There is still a lot more refactoring that could be done between Log and LogSegment, but I am kind of putting that off.
21. LogSegment: "bytesSinceLastIndexEntry needs to be updated in append()." Fixed.
22. FileMessageSet.searchFor() fixed bad byte arithmetic.
23. OffsetIndex:
23.1 Fixed bad english in comment
23.2 mmap initialization: Yes, this doesn't make sense. The correct logic is that the mutable case must be set to index 0, and the read-only case doesn't matter. This was happening implicitly since byte buffers initialize to 0, but I switched it to make it explicit.
23.3 append(): "If index entry is full, should we automatically roll the log segment?" This is already handled in Log.maybeRoll(segment) which checks segment.index.isFull
23.4 makeReadOnly(): "should we call flush after raf.setLength()?" This is a good point. I think
what you are saying is that the truncate call itself needs the metadata to flush to be considered stable. Calling force on the mmap after the setLength won't do this. Instead I changed the file open to use synchronous mode "rws" which should automatically fsync metadata when we call setLength. The existing flush is okay: I verified that flush doesn't cause the sparse file to desparsify or anything like that. "Also, should we remap the index file to the current length and make it read only?" Well, this isn't really needed. There is no problem with truncating a file post mmap, but I guess making the mapping read-only could prevent corruption due to any bugs we might have so I made that change.
LogManager
24. "log indentation already adds LogManager in the prefix of each log entry." Oops.
25. KafkaApis:
25.1 "handleFetchRequest: topicDatas is weird since data is the plural form of datum. How about topicDataMap?" Changed to dataRead (I don't like having the type in the name).
25.2 "ProducerRequestPurgatory: It seems that it's useful to keep the logIndent since it can distinguish logs from the ProducerRequestPurgatory and FetchRequestPurgatory. Also, it's probably useful to pass in brokerId to RequestPurgatory for debugging unit tests." Agreed, accidentally removed this; added it back.
26. "Partition: There are a few places that the first character of info log is changed to lower case. The current convention is to already use upper case." Made all upper case.
27. "javaapi.ByteBufferMessageSet: underlying should be private val." Changed.
28. "DumpLogSegment: Now that each message stores an offset, we should just print the offset in MessageAndOffset. There is no need for var offset now." Removed.
29. "FetchedDataChunk: No need to use val for parameters in constructor since this is a case class now." Wait is everything a val in a case class? I made this change, but don't know what it means...
30. PartitionData:
30.1 "No need to redefine equals and hashcode since this is already a case class." Yeah, this was fixing a bug in the equals/hashcode stuff due to the array that went away when i rebased. Removed it
30.2 "initialOffset is no longer needed." I think PartitionData is also used by ProducerRequest. This is a bug, but I think we do need the initial offset for the other case. Until we separate these two I don't think I can remove it.
31. "PartitionTopicInfo.enqueue(): It seems that next can be computed using shallow iterator." Ah, very nice. Changed that.
32. "ByteBufferMessageSet: In create() and decompress(), we probably should close the output and the input stream in a finally clause in case we hit any exception during compression and decompression." These are not real output streams. I can close them, but they are just arrays so I think it is just noise, no?
33. "remove unused imports." Eclipse doesn't identify them, will swing by.
34. "How do we handle the case that a consumer uses too small a fetch size?" Added a check and throw for this in ConsumerIterator.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469794#comment-13469794 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

It is really hard/error-prone to keep this patch alive and functioning, I basically spend half of each day on rebasing then debugging the new bugs i introduce during rebasing. Could we do it as a post commit review? I am totally down to fix/change things, but the problem is each new change may take a few iterations and meanwhile the whole hunk has to be kept alive. In an ideal world I would have found a way to have done this in smaller pieces, but it is kind of a cross-cutting change so that was hard.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468095#comment-13468095 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Ran system test, passes:


2012-10-02 14:11:50,376 - INFO - ======================================================
2012-10-02 14:11:50,376 - INFO - stopping all entities
2012-10-02 14:11:50,376 - INFO - ======================================================

2012-10-02 14:12:43,105 - INFO - =================================================
2012-10-02 14:12:43,105 - INFO -                  TEST REPORTS
2012-10-02 14:12:43,105 - INFO - =================================================
2012-10-02 14:12:43,105 - INFO - test_case_name : testcase_1
2012-10-02 14:12:43,105 - INFO - test_class_name : ReplicaBasicTest
2012-10-02 14:12:43,105 - INFO - validation_status : 
2012-10-02 14:12:43,105 - INFO -     Leader Election Latency - iter 2 brokerid 3 : 49636.00 ms
2012-10-02 14:12:43,105 - INFO -     Validate leader election successful : PASSED
2012-10-02 14:12:43,106 - INFO -     Unique messages from consumer : 850
2012-10-02 14:12:43,106 - INFO -     Validate for data matched : PASSED
2012-10-02 14:12:43,106 - INFO -     Unique messages from producer : 850
2012-10-02 14:12:43,106 - INFO -     Leader Election Latency - iter 1 brokerid 2 : 354.00 ms

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457106#comment-13457106 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Great feedback, thanks.

1. Good point about nextOffset. I think this is slightly tricky to fix. I think I will ignore this problem and work on phase 2 which will fix that issue by making nextOffset=offset+1. This means taking both patches at once which will be a bit big. Sound feasible?
2-4 Good feedback
5.1. Good point.
5.2. I will do a little micro-benchmark on decompression/re-compression. Yes, we can definitely avoid this for the replica fetcher thread. Depending on how much we want to optimize that path there are a lot of options. On the extreme side of total trust I think it might actually possible to do FileChannel.transferTo directly from the socket buffer, though there are complications around metrics and hw mark. I think for now it makes sense to just skip decompression. One question: let's say recompression turns out to be expensive, there are two options: (1) do not set internal offsets (as today), (2) eat the cost and recommend snappy instead of gzip. Personally I prefer (2) since I think we need to fix the correctness bugs, but I am open to implementing either if there is a consensus.
5.3. Good catch
6. OK
7. I am not sure. We had a custom implementation of equals but no hashcode which I think was likely wrong. We can remove both, but I would want to figure out why we added the equals.
8-9. OK
10. Ah, forgot to add that. I think the right thing is just to check (currentDataChunk.messages.size > 0 && currentDataChunk.buffer.size == fetchSize) throw Exception() in the ConsumerIterator. The only thing to consider is that this means there is no check for the simpleconsumer.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-v1-draft.patch

Add key to message and reorder some fields
Bump up Message magic number to 2
Add offset to MessageSet format
Make MessageAndOffset contain the current offset and add a nextOffset() method to get the next offset
Some misc. cleanups (delete some obsolete files, fix bad formatting)

There are still two problems with this patch:
1. Not handling offsets properly in compressed messages
2. Unit test failures in LogRecoveryTest
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13471637#comment-13471637 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

Should apply on HEAD.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13471698#comment-13471698 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

+1 from me.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-v4-changes-since-v3.patch
                KAFKA-506-phase-2-v4.patch

Here is a new patch that addresses these comments. I also did an incremental diff against the previous patch so you can see the specific changes for the below items (that is KAFKA-506-v4-changes-since-v3.patch)

Also rebased again.

40. I actually disagree. It is more code to add and subtract, but I think it makes more sense. This way we would say the append api returns "the first and last offset for the messages you appended" rather than "the first offset for the messages you appended and the offset of the next message that would be appended". This is not a huge deal so I can go either way, but I did think about it both ways and that was my rationale.

41. My thinking was that there were only two cases: re-creating a new, mutable index (at position 0) and opening a read-only index. In reality there are three cases: in addition to the previous two you can be re-opening an existing log that went through clean shutdown. I was not handling this properly and in fact was truncating the index on re-open, so the existing entries in the last segment would be unindexed. There are now two cases for mutable indexes. Recall that on clean-shutdown the index is always truncated to the max valid entry. So now when we open an index, if the file exists, I set the position to the end of the file. If the file doesn't exist I allocate it and start at position 0. The recovery process well still re-create the index if it runs, if the shutdown was clean then we will just roll to a new segment on the first append (since the index was truncated, it is now full).

43. I removed that feature since the iterator only has the offset not the file position. However after thinking about it I can add it back by just using MessageSet.entrySize(message) on each entry and use the sum of these to compare to the messageSet.sizeInBytes. Added that.

44. Changed the check to be the messageSet.sizeInBytes. This check was really meant to guard the case where we are at the end of the log and get an empty set. I think it was using validBytes because it needed to calculate the next offset. Now that calculation is gone, so I think it is okay to just use messageSet.sizeInBytes. This would result in a set with 0 valid bytes being enqueued, and then the error getting thrown to the consumer. The fetcher would likely continue to fetch this message set, but that should be bounded by the consumer queue size.

45. The behavior after this patch should be exactly the same as the current behavior, so my hope was to do this as a follow up patch.

Also: Found that I wasn't closing the index when the log was closed, and found a bug in the index re-creation logic in recovery; fixed both, and expanded tests for this.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468896#comment-13468896 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Patch v4 looks good overall. A couple of remaining issues:

50. testCompressionSetConsumption seems to fail transiently for me with the following exception. This seems to be related to the change made for #44.
kafka.common.MessageSizeTooLargeException: The broker contains a message larger than the maximum fetch size of this consumer. Increase the fetch size, or decrease the maximum message size the broker will allow.
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:87)

51. ConsumerIterator: When throwing MessageSizeTooLargeException, could we add the topic/partition/offset to the message string in the exception?

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-neha-post-review-v2.patch

This patch is identical to the previous Neha related patch except that now in the event that a log segment can't be deleted we throw KafkaStorageException. In KafkaApis.handleLeaderAndISRRequest we catch this exception and shutdown the server.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-neha-post-review-v2.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456744#comment-13456744 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

Thanks for patch v1. Overall, the log format change is reasonable. Some comments:

1. MessageAndOffset: nextOffset is not correct for compressed messages. Currently, in the high-level consumer, after iterating each message, the consume offset is moved to the offset of the next message. So, if one consumes a message and then commits the offset, the committed offset points to the next message to be consumed. We could probably change the protocol to move the consumer offset to the offset of the current message. Then, the caller will need to commit the offset first and then consumes the message to get the same semantics.

2. Message:
2.1 The comment of the message has a bug. Payload should have (N- K - 10) bytes.
2.2 In constructor, should we assert that offset is btw 0 and bytes.length-1? Also, just to be clear that offset and size are for the payload, should we rename bytes, offset and size to something like payload, payloadOffset and payloadSize?
2.3 computeChecksum(): can use MagicOffset for both starting offset and length
2.4 remove unused import

3. MessageSet: Fix the comment in second line "A The format".

4. ByteBufferMessageSet: remove unused comment

5. Log:
5.1 append(): For verifying message size, we need to use the shallow iterator since a compressed message has to be smaller than the configured max message size.
5.2 append(): Compressed messages are forced to be decompressed and then compressed again. This will introduce some CPU overhead. What's the increase in CPU utilization if incoming messages are compressed? Also, for replicaFetchThread, it can just put the data fetched from the leader directly into the log without recomputing the offsets. Could we add a flag in append to bypass regenerating the offsets?
5.3 trimInvalidBytes(): There is a bug in the following statement: messages.size should be messages.sizeInBytes.
    if(messageSetValidBytes == messages.size) {

6. javaapi.ByteBufferMessageSet: Java users shouldn't really be using buffer. So, we don't need the bean property.

7. PartitionData: Do we need to override equal and hash since this is already a case class?

8. ZkUtils.conditionalUpdatePersistenPath(): This method expects exception due to version conflict. So there is no need to log the exception.

9. SyncProducerTest: remove unused imports

10. How do we handle the case that a consumer uses too small a fetch size?
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-neha-post-review.patch

Hi Neha, here are some comments on your comments and a patch that addresses the comments we are in agreement on.

1. Log
1.2, 1.3 True. This problem exists in both OffsetIndex and Log, though I don't think either are actually possible. In Log this requires one to have 2 billion segment files, though, which is not physically possible; in OffsetIndex one would need to have ~2 billion entries in an index, which isn't possible as the message overhead would fill up the log segment first. I am going to leave it alone in Log since that code I want to delete asap anyway. I fixed it in the OffsetIndex since that code is meant to last.
1.4. This logic is a little odd, I will fix it, but actually this reminds me of a bigger problem. If file.delete() fails on the log file, the presence of that log file will effectively corrupt the log on restart (since we will have a file with the given offset but will also start another log with a parallel offset that we actually append to--on restart the bad file will mask part of the new file). Obviously if file.delete() fails things are pretty fucked and there is nothing we can do in software to recover. So what I would like to do is throw KafkaStorageException and have Partition.makeFollower() shut down the server. What would happen in the leadership transfer if I did that?
1.5 Filed a JIRA for this.

LogManager
2.1 Deleted numPartitions (not related to this patch, I don't think)

FileMessageSet
3.1 Good catch, fixed.
3.2 Right, so I return the offset specifically to be able to differentiate the case where I found the exact location versus the next message. This is important for things like truncate. I always return the offset and corresponding file position of the first offset that meets the >= criteria. So either I am confused, or I think it works the way you are saying it should.
3.3 Well, but the code actually reads and Int and Long out of the resulting buffer, so if MessageSet.LogOverhead != 12 there is a bug, so we aren't abstracting anything just adding a layer of obfuscation. But, yes, it should be consistent, so changed it.

LogSegment
4. LogSegment
4.1 I don't want to allocate an object for each call as this method is internal to LogSegment. I will make it private to emphasize that.
4.2 I agree, though we have had the 2gb limit for a while now so this isn't new. We repurposed KAFKA-556 for this.

5. ConsumerIterator
Agreed. Broke this into a separate issue since current state is no worse than 0.7.x. JIRA is KAFKA-546.

6. ReplicaFetcherThread
Agreed this was discussed above. JIRA is KAFKA-557.

7. Only IDEA detects this, which I don't have. So can't help on this.

8. ByteBufferMessageSet
8.2 Fixed

9. OffsetIndex 
9.1 Fixed
9.2 This is true but I think it would be more convoluted. Simple test and exits make it so you don't have to add another layer of nesting.

10 Agreed, of the various things on my plate I think this is the most important. Any issues here are resolvable, but we need to first get the data.

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-neha-post-review.patch, KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Comment Edited] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472753#comment-13472753 ] 

Neha Narkhede edited comment on KAFKA-506 at 10/10/12 12:58 AM:
----------------------------------------------------------------

I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely :) Since you know the code better, feel free to file follow up JIRAs

1. Log

1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search -
      val mid = ceil((high + low) / 2.0).toInt
Will probably be better to use
     val mid = low + ceil((high - low)/2.0).toInt
1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls.
We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length.
1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory.
1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me.

2. LogManager
2.1 numPartitions is an unused class variable

3. FileMessageSet
3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset
3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent.
3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead.

4. LogSegment
4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment.
4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 

5. ConsumerIterator

Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset.

6. ReplicaFetcherThread

When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter

7. MessageCompressionTest
There are 2 unused imports in this file

8. ByteBufferMessageSet
8.1 There are 3 unused imports in this file
8.2 The return statement in create() API is redundant

9. OffsetIndex
9.1 The last return statement in indexSlotFor is redundant
9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block.

10. Performance
Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 -
10.1 Recompression of data during replica reads
10.2 Recompression of data to assign correct offsets inside a compressed message set
10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval.
10.4 The impact of making the log memory mapped.
10.5 Overhead of using the index to read/write data in Kafka

11. KafkaApis
Unused imports in this file

Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here -

Follow up JIRAs
1. Retain key in producer (KAFKA-544)
2. Change sizeInBytes() to Int (KAFKA-556)
3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546)
4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557)
5. Rebuild missing index files (KAFKA-561)
6. Add performance test for log subsystem (KAFKA-545)
7. Overall Performance analysis due to the factors listed above

JIRAs resolved due to this feature
1. Fix offsets returned as part of producer response (KAFKA-511)
2. Consumer offset issue during unclean leader election (KAFKA-497)



                
      was (Author: nehanarkhede):
    I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely :) Since you know the code better, feel free to file follow up JIRAs

1. Log

1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search -
      val mid = ceil((high + low) / 2.0).toInt
Will probably be better to use
     val mid = low + ceil((high - low)/2.0).toInt
1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls.
We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length.
1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory.
1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me.

2. LogManager
2.1 numPartitions is an unused class variable

3. FileMessageSet
3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset
3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent.
3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead.

4. LogSegment
4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment.
4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 

5. ConsumerIterator

Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset.

6. ReplicaFetcherThread

When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter

7. MessageCompressionTest
There are 2 unused imports in this file

8. ByteBufferMessageSet
8.1 There are 3 unused imports in this file
8.2 The return statement in create() API is redundant

9. OffsetIndex
9.1 The last return statement in indexSlotFor is redundant
9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block.

10. Performance
Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 -
10.1 Recompression of data during replica reads
10.2 Recompression of data to assign correct offsets inside a compressed message set
10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval.
10.4 The impact of making the log memory mapped.
10.5 Overhead of using the index to read/write data in Kafka

Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here -

Follow up JIRAs
1. Retain key in producer (KAFKA-544)
2. Change sizeInBytes() to Int (KAFKA-556)
3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546)
4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557)
5. Rebuild missing index files (JIRA to be filed)

                  
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-v1.patch

Updated the patch. This patch fixes the remaining failing tests and correctly handles compressed messages.

This patch is ready for review.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469949#comment-13469949 ] 

Joel Koshy commented on KAFKA-506:
----------------------------------

Rebasing is painful for sure, especially since 0.8 is moving quite fast. I think the other patches in flight are either small or otherwise straightforward to rebase as they don't have significant overlap. So it seems holding off all check-ins until after this weekend would work for everyone right?

                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13469945#comment-13469945 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

What we can do is to hold off committing other conflicting patches for now and have this patch more thoroughly reviewed. If there are no major concerns, we can just commit the patch and have follow-up jiras to address minor issues. Neha, do you think that you can finish the review by Saturday?
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13471628#comment-13471628 ] 

Neha Narkhede commented on KAFKA-506:
-------------------------------------

Agree that rebasing is painful. In addition to more reviews, the hope was to check in ~20 more test cases as part of KAFKA-502, so we could test it out thoroughly. But we can check it in and return to fixing issues later as well. 
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468644#comment-13468644 ] 

Jun Rao commented on KAFKA-506:
-------------------------------

There is another issue:

45. ConsumerIterator: Now that we index each message inside a compressed message, we need to handle the case when a fetch request starting on an offset in the middle of a compressed message. In makeNext(), we need to first skip messages whose offset is less than currentDataChunk.fetchOffset. Otherwise, the consumer would get duplicates. We probably can do this in a followup jira since currently the consumer can get duplicates on compressed messages too.


                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment:     (was: KAFKA-506-phase-2-v2.patch)
    
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13470497#comment-13470497 ] 

Jay Kreps commented on KAFKA-506:
---------------------------------

jkreps-mn:kafka-git jkreps$ git pull
remote: Counting objects: 72, done.
remote: Compressing objects: 100% (37/37), done.
remote: Total 42 (delta 26), reused 0 (delta 0)
Unpacking objects: 100% (42/42), done.
>From git://git.apache.org/kafka
   0aa1500..65e139c  0.8        -> origin/0.8
Auto-merging core/src/main/scala/kafka/api/FetchResponse.scala
CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/FetchResponse.scala
Auto-merging core/src/main/scala/kafka/api/ProducerRequest.scala
CONFLICT (content): Merge conflict in core/src/main/scala/kafka/api/ProducerRequest.scala
Auto-merging core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Auto-merging core/src/main/scala/kafka/server/AbstractFetcherThread.scala
CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Auto-merging core/src/main/scala/kafka/server/KafkaApis.scala
CONFLICT (content): Merge conflict in core/src/main/scala/kafka/server/KafkaApis.scala
Auto-merging core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Auto-merging core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Auto-merging core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Auto-merging core/src/test/scala/unit/kafka/utils/TestUtils.scala
Automatic merge failed; fix conflicts and then commit the result.

:-(
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-506:
----------------------------

    Attachment: KAFKA-506-phase-2-v2.patch
    
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Resolved] (KAFKA-506) Store logical offset in log

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps resolved KAFKA-506.
-----------------------------

    Resolution: Fixed

Committed.
                
> Store logical offset in log
> ---------------------------
>
>                 Key: KAFKA-506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-506
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>             Fix For: 0.8
>
>         Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, KAFKA-506-v4-changes-since-v3.patch
>
>
> Currently we only support retention by dropping entire segment files. A more nuanced retention policy would allow dropping individual messages from a segment file by recopying it. This is not currently possible because the lookup structure we use to locate messages is based on the file offset directly.
> To fix this we should move to a sequential, logical offset (0,1,2,3,...) which would allow deleting individual messages (e.g. 2) without deleting the entire segment.
> It is desirable to make this change in the 0.8 timeframe since we are already doing data format changes.
> As part of this we would explicitly store the key field given by the producer for partitioning (right now there is no way for the consumer to find the value used for partitioning).
> This combination of features would allow a key-based retention policy that would clean obsolete values either by a user defined key.
> The specific use case I am targeting is a commit log for local state maintained by a process doing some kind of near-real-time processing. The process could log out its local state changes and be able to restore from this log in the event of a failure. However I think this is a broadly useful feature.
> The following changes would be part of this:
> 1. The log format would now be
>       8 byte offset
>       4 byte message_size
>       N byte message
> 2. The offsets would be changed to a sequential, logical number rather than the byte offset (e.g. 0,1,2,3,...)
> 3. A local memory-mapped lookup structure will be kept for each log segment that contains the mapping from logical to physical offset.
> I propose to break this into two patches. The first makes the log format changes, but retains the physical offset. The second adds the lookup structure and moves to logical offset.
> Here are a few issues to be considered for the first patch:
> 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One surprising thing is that the offset is actually the offset of the next message. I think there are actually several uses for the current offset. I would propose making this hold the current message offset since with logical offsets the next offset is always just current_offset+1. Note that since we no longer require messages to be dense, it is not true that if the next offset is N the current offset is N-1 (because N-1 may have been deleted). Thoughts or objections?
> 2. Currently during iteration over a ByteBufferMessageSet we throw an exception if there are zero messages in the set. This is used to detect fetches that are smaller than a single message size. I think this behavior is misplaced and should be moved up into the consumer.
> 3. In addition to adding a key in Message, I made two other changes: (1) I moved the CRC to the first field and made it cover the entire message contents (previously it only covered the payload), (2) I dropped support for Magic=0, effectively making the attributes field required, which simplifies the code (since we are breaking compatibility anyway).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira