You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <ja...@gmail.com> on 2013/08/03 07:19:34 UTC

compression performance

Chris commented in another thread about the poor compression performance in
0.8, even with snappy.

Indeed if I run the linear log write throughput test on my laptop I see
75MB/sec with no compression and 17MB/sec with snappy.

This is a little surprising as snappy claims 200MB round-trip performance
(compress + uncompress) from java. So what is going on?

Well you may remember I actually filed a bug a while back on all the
inefficient byte copying in the compression path (KAFKA-527). I didn't
think too much of it, other than it is a bit sloppy, since after all
computers are good at copying bytes, right?

Turns out not so much, if you look at a profile of the standalone log test
you see that with no compression 80% of the time goes to FileChannel.write,
which is reasonable since that is what a log does.

But with snappy enabled only 5% goes to writing data, 50% of the time goes
to byte copying and allocation, and only about 22% goes to actual
compression and decompression (with lots of misc stuff in their I haven't
bothered to tally).

If someone was to optimize this code path I think we could take a patch in
0.8.1. It shouldn't be too hard, just using the backing array on the byte
buffer and avoiding all the input streams, output streams, byte array
output streams, and intermediate message blobs.

I summarized this along with how to reproduce the test results here:
https://issues.apache.org/jira/browse/KAFKA-527

-Jay

Re: compression performance

Posted by Jay Kreps <ja...@gmail.com>.
Sriram, I think I agree. Guozhang's proposal is clever but it exposes a lot
of complexity to the consumer. But I think it is good to have the complete
discussion.

Chris, we will certainly not mess up the uncompressed case, don't worry. I
think your assumption is that compression needs to be slow. I think where
Sriram and I are coming from is that we think that if snappy can roundtrip
at 400MB/core cpu is not going to be a bottleneck and so this will be
"free". We think the issue you are seeing is really not due to compression
so much as it is due to silliness on our part. Previously that silliness
was on the producer side, where for us it was masked in 0.7 by the fact
that we have like 10,000 producers so the additional cpu wasn't super
noticable; obviously once you centralize that down to a few dozen brokers
the problem becomes quite acute. Even Guozhang's proposal would only remove
the recompression, the decompression is still there.

-Jay


On Thu, Aug 15, 2013 at 7:50 PM, Chris Hogue <cs...@gmail.com> wrote:

> I would generally agree with the key goals you've suggested.
>
> I'm just coming to this discussion after some recent testing with 0.8 so I
> may be missing some background. The reference I found to this discussion is
> the JIRA issue below. Please let me know if there are others things I
> should look at.
>
>
> https://issues.apache.org/jira/browse/KAFKA-595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
>
> If I'm reading this correctly the reasoning behind removing compression
> from the producer is that its benefit (network bandwidth saved) is
> outweighed by the cost of the un-compress/re-compress on the broker. The
> issue with the odd heuristic about which codec to use on the broker makes
> sense.
>
> However I think the implied assumption that the broker will always
> un-compress and re-compress warrants discussion. This doesn't necessarily
> have to be the case as the approach outlined in this thread suggests. And
> if you remove that assumption you free up a lot of potential in the
> brokers.
>
> While one way to look at this is "we're already doing it on the broker, why
> do it on the producer", we came to it from the other angle, "we're already
> doing it on the producer, why have the broker do it again". I can certainly
> see cases where each would be appropriate.
>
> As noted in other threads removing compression from the broker's
> responsibility increased our throughput over 3x. This is still doing
> compression on the producer app, just outside of the kafka API, so it still
> benefits from the reduced network bandwidth the current built-in
> compression has.
>
> I appreciate the gains that can be had through optimizing the byte
> management in that code path. That seems like a good path to go down for
> the general case. But no matter how much you optimize it there's still
> going to be a non-trivial cost on the broker.
>
> So in an ideal world the Kafka APIs would have a built-in ability for us to
> choose at an application level whether we want the compression load to be
> on the producer or the broker. At a minimum I'm really hoping our ability
> to do that ourselves doesn't go away, especially if we're willing to take
> on the responsibility of batching/compressing. Said another way, we would
> at least need the optimized code path for uncompressed messages in
> ByteBufferMessageSet.assignOffsets() to stick around so that we can do it
> on our own.
>
> Thanks for all of the consideration here, it's a good discussion.
>
>
> -Chris
>
>
>
>
> On Thu, Aug 15, 2013 at 2:23 PM, Sriram Subramanian <
> srsubramanian@linkedin.com> wrote:
>
> > We need to first decide on the right behavior before optimizing on the
> > implementation.
> >
> > Few key goals that I would put forward are -
> >
> > 1. Decoupling compression codec of the producer and the log
> > 2. Ensuring message validity by the server on receiving bytes. This is
> > done by the iterator today and this is important to ensure bad data does
> > not creep in
> > 3. Simple consumer implementation
> > 4. Implementation that has good performance and efficiency
> >
> > With the above points in mind, I suggest we switch to Snappy as the
> > default compression, optimize the code on the server end to avoid
> > unnecessary copies and remove producer side compression completely except
> > for cross DC sends.
> >
> > On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:
> >
> > >Here is a comment from Guozhong on this issue. He posted it on the
> > >compression byte-copying issue, but it is really about not needing to do
> > >compression. His suggestion is interesting though it ends up pushing
> more
> > >complexity into consumers.
> > >
> > >Guozhang Wang commented on KAFKA-527:
> > >-------------------------------------
> > >
> > >One alternative approach would be like this:
> > >
> > >Currently in the compression code (ByteBufferMessageSet.create), for
> each
> > >message we write 1) the incrementing logical offset in LONG, 2) the
> > >message
> > >byte size in INT, and 3) the message payload.
> > >
> > >The idea is that since the logical offset is just incrementing, hence
> with
> > >a compressed message, as long as we know the offset of the first
> message,
> > >we would know the offset of the rest messages without even reading the
> > >offset field.
> > >
> > >So we can ignore reading the offset of each message inside of the
> > >compressed message but only the offset of the wrapper message which is
> the
> > >offset of the last message + 1, and then in assignOffsets just modify
> the
> > >offset of the wrapper message. Another change would be at the consumer
> > >side, the iterator would need to be smart of interpreting the offsets of
> > >messages while deep-iterating the compressed message.
> > >
> > >As Jay pointed out, this method would not work with log compaction since
> > >it
> > >would break the assumption that offsets increments continuously. Two
> > >workarounds of this issue:
> > >
> > >1) In log compaction, instead of deleting the to-be-deleted-message just
> > >setting its payload to null but keep its header and hence keeping its
> slot
> > >in the incrementing offset.
> > >2) During the compression process, instead of writing the absolute value
> > >of
> > >the logical offset of messages, write the deltas of their offset
> compared
> > >with the offset of the wrapper message. So -1 would mean continuously
> > >decrementing from the wrapper message offset, and -2/3/... would be
> > >skipping holes in side the compressed message.
> > >
> > >
> > >On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > >> Chris commented in another thread about the poor compression
> performance
> > >> in 0.8, even with snappy.
> > >>
> > >> Indeed if I run the linear log write throughput test on my laptop I
> see
> > >> 75MB/sec with no compression and 17MB/sec with snappy.
> > >>
> > >> This is a little surprising as snappy claims 200MB round-trip
> > >>performance
> > >> (compress + uncompress) from java. So what is going on?
> > >>
> > >> Well you may remember I actually filed a bug a while back on all the
> > >> inefficient byte copying in the compression path (KAFKA-527). I didn't
> > >> think too much of it, other than it is a bit sloppy, since after all
> > >> computers are good at copying bytes, right?
> > >>
> > >> Turns out not so much, if you look at a profile of the standalone log
> > >>test
> > >> you see that with no compression 80% of the time goes to
> > >>FileChannel.write,
> > >> which is reasonable since that is what a log does.
> > >>
> > >> But with snappy enabled only 5% goes to writing data, 50% of the time
> > >>goes
> > >> to byte copying and allocation, and only about 22% goes to actual
> > >> compression and decompression (with lots of misc stuff in their I
> > >>haven't
> > >> bothered to tally).
> > >>
> > >> If someone was to optimize this code path I think we could take a
> patch
> > >>in
> > >> 0.8.1. It shouldn't be too hard, just using the backing array on the
> > >>byte
> > >> buffer and avoiding all the input streams, output streams, byte array
> > >> output streams, and intermediate message blobs.
> > >>
> > >> I summarized this along with how to reproduce the test results here:
> > >> https://issues.apache.org/jira/browse/KAFKA-527
> > >>
> > >> -Jay
> > >>
> >
> >
>

Re: compression performance

Posted by Jay Kreps <ja...@gmail.com>.
Sriram, I think I agree. Guozhang's proposal is clever but it exposes a lot
of complexity to the consumer. But I think it is good to have the complete
discussion.

Chris, we will certainly not mess up the uncompressed case, don't worry. I
think your assumption is that compression needs to be slow. I think where
Sriram and I are coming from is that we think that if snappy can roundtrip
at 400MB/core cpu is not going to be a bottleneck and so this will be
"free". We think the issue you are seeing is really not due to compression
so much as it is due to silliness on our part. Previously that silliness
was on the producer side, where for us it was masked in 0.7 by the fact
that we have like 10,000 producers so the additional cpu wasn't super
noticable; obviously once you centralize that down to a few dozen brokers
the problem becomes quite acute. Even Guozhang's proposal would only remove
the recompression, the decompression is still there.

-Jay


On Thu, Aug 15, 2013 at 7:50 PM, Chris Hogue <cs...@gmail.com> wrote:

> I would generally agree with the key goals you've suggested.
>
> I'm just coming to this discussion after some recent testing with 0.8 so I
> may be missing some background. The reference I found to this discussion is
> the JIRA issue below. Please let me know if there are others things I
> should look at.
>
>
> https://issues.apache.org/jira/browse/KAFKA-595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
>
> If I'm reading this correctly the reasoning behind removing compression
> from the producer is that its benefit (network bandwidth saved) is
> outweighed by the cost of the un-compress/re-compress on the broker. The
> issue with the odd heuristic about which codec to use on the broker makes
> sense.
>
> However I think the implied assumption that the broker will always
> un-compress and re-compress warrants discussion. This doesn't necessarily
> have to be the case as the approach outlined in this thread suggests. And
> if you remove that assumption you free up a lot of potential in the
> brokers.
>
> While one way to look at this is "we're already doing it on the broker, why
> do it on the producer", we came to it from the other angle, "we're already
> doing it on the producer, why have the broker do it again". I can certainly
> see cases where each would be appropriate.
>
> As noted in other threads removing compression from the broker's
> responsibility increased our throughput over 3x. This is still doing
> compression on the producer app, just outside of the kafka API, so it still
> benefits from the reduced network bandwidth the current built-in
> compression has.
>
> I appreciate the gains that can be had through optimizing the byte
> management in that code path. That seems like a good path to go down for
> the general case. But no matter how much you optimize it there's still
> going to be a non-trivial cost on the broker.
>
> So in an ideal world the Kafka APIs would have a built-in ability for us to
> choose at an application level whether we want the compression load to be
> on the producer or the broker. At a minimum I'm really hoping our ability
> to do that ourselves doesn't go away, especially if we're willing to take
> on the responsibility of batching/compressing. Said another way, we would
> at least need the optimized code path for uncompressed messages in
> ByteBufferMessageSet.assignOffsets() to stick around so that we can do it
> on our own.
>
> Thanks for all of the consideration here, it's a good discussion.
>
>
> -Chris
>
>
>
>
> On Thu, Aug 15, 2013 at 2:23 PM, Sriram Subramanian <
> srsubramanian@linkedin.com> wrote:
>
> > We need to first decide on the right behavior before optimizing on the
> > implementation.
> >
> > Few key goals that I would put forward are -
> >
> > 1. Decoupling compression codec of the producer and the log
> > 2. Ensuring message validity by the server on receiving bytes. This is
> > done by the iterator today and this is important to ensure bad data does
> > not creep in
> > 3. Simple consumer implementation
> > 4. Implementation that has good performance and efficiency
> >
> > With the above points in mind, I suggest we switch to Snappy as the
> > default compression, optimize the code on the server end to avoid
> > unnecessary copies and remove producer side compression completely except
> > for cross DC sends.
> >
> > On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:
> >
> > >Here is a comment from Guozhong on this issue. He posted it on the
> > >compression byte-copying issue, but it is really about not needing to do
> > >compression. His suggestion is interesting though it ends up pushing
> more
> > >complexity into consumers.
> > >
> > >Guozhang Wang commented on KAFKA-527:
> > >-------------------------------------
> > >
> > >One alternative approach would be like this:
> > >
> > >Currently in the compression code (ByteBufferMessageSet.create), for
> each
> > >message we write 1) the incrementing logical offset in LONG, 2) the
> > >message
> > >byte size in INT, and 3) the message payload.
> > >
> > >The idea is that since the logical offset is just incrementing, hence
> with
> > >a compressed message, as long as we know the offset of the first
> message,
> > >we would know the offset of the rest messages without even reading the
> > >offset field.
> > >
> > >So we can ignore reading the offset of each message inside of the
> > >compressed message but only the offset of the wrapper message which is
> the
> > >offset of the last message + 1, and then in assignOffsets just modify
> the
> > >offset of the wrapper message. Another change would be at the consumer
> > >side, the iterator would need to be smart of interpreting the offsets of
> > >messages while deep-iterating the compressed message.
> > >
> > >As Jay pointed out, this method would not work with log compaction since
> > >it
> > >would break the assumption that offsets increments continuously. Two
> > >workarounds of this issue:
> > >
> > >1) In log compaction, instead of deleting the to-be-deleted-message just
> > >setting its payload to null but keep its header and hence keeping its
> slot
> > >in the incrementing offset.
> > >2) During the compression process, instead of writing the absolute value
> > >of
> > >the logical offset of messages, write the deltas of their offset
> compared
> > >with the offset of the wrapper message. So -1 would mean continuously
> > >decrementing from the wrapper message offset, and -2/3/... would be
> > >skipping holes in side the compressed message.
> > >
> > >
> > >On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > >> Chris commented in another thread about the poor compression
> performance
> > >> in 0.8, even with snappy.
> > >>
> > >> Indeed if I run the linear log write throughput test on my laptop I
> see
> > >> 75MB/sec with no compression and 17MB/sec with snappy.
> > >>
> > >> This is a little surprising as snappy claims 200MB round-trip
> > >>performance
> > >> (compress + uncompress) from java. So what is going on?
> > >>
> > >> Well you may remember I actually filed a bug a while back on all the
> > >> inefficient byte copying in the compression path (KAFKA-527). I didn't
> > >> think too much of it, other than it is a bit sloppy, since after all
> > >> computers are good at copying bytes, right?
> > >>
> > >> Turns out not so much, if you look at a profile of the standalone log
> > >>test
> > >> you see that with no compression 80% of the time goes to
> > >>FileChannel.write,
> > >> which is reasonable since that is what a log does.
> > >>
> > >> But with snappy enabled only 5% goes to writing data, 50% of the time
> > >>goes
> > >> to byte copying and allocation, and only about 22% goes to actual
> > >> compression and decompression (with lots of misc stuff in their I
> > >>haven't
> > >> bothered to tally).
> > >>
> > >> If someone was to optimize this code path I think we could take a
> patch
> > >>in
> > >> 0.8.1. It shouldn't be too hard, just using the backing array on the
> > >>byte
> > >> buffer and avoiding all the input streams, output streams, byte array
> > >> output streams, and intermediate message blobs.
> > >>
> > >> I summarized this along with how to reproduce the test results here:
> > >> https://issues.apache.org/jira/browse/KAFKA-527
> > >>
> > >> -Jay
> > >>
> >
> >
>

Re: compression performance

Posted by Chris Hogue <cs...@gmail.com>.
I would generally agree with the key goals you've suggested.

I'm just coming to this discussion after some recent testing with 0.8 so I
may be missing some background. The reference I found to this discussion is
the JIRA issue below. Please let me know if there are others things I
should look at.

https://issues.apache.org/jira/browse/KAFKA-595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

If I'm reading this correctly the reasoning behind removing compression
from the producer is that its benefit (network bandwidth saved) is
outweighed by the cost of the un-compress/re-compress on the broker. The
issue with the odd heuristic about which codec to use on the broker makes
sense.

However I think the implied assumption that the broker will always
un-compress and re-compress warrants discussion. This doesn't necessarily
have to be the case as the approach outlined in this thread suggests. And
if you remove that assumption you free up a lot of potential in the brokers.

While one way to look at this is "we're already doing it on the broker, why
do it on the producer", we came to it from the other angle, "we're already
doing it on the producer, why have the broker do it again". I can certainly
see cases where each would be appropriate.

As noted in other threads removing compression from the broker's
responsibility increased our throughput over 3x. This is still doing
compression on the producer app, just outside of the kafka API, so it still
benefits from the reduced network bandwidth the current built-in
compression has.

I appreciate the gains that can be had through optimizing the byte
management in that code path. That seems like a good path to go down for
the general case. But no matter how much you optimize it there's still
going to be a non-trivial cost on the broker.

So in an ideal world the Kafka APIs would have a built-in ability for us to
choose at an application level whether we want the compression load to be
on the producer or the broker. At a minimum I'm really hoping our ability
to do that ourselves doesn't go away, especially if we're willing to take
on the responsibility of batching/compressing. Said another way, we would
at least need the optimized code path for uncompressed messages in
ByteBufferMessageSet.assignOffsets() to stick around so that we can do it
on our own.

Thanks for all of the consideration here, it's a good discussion.


-Chris




On Thu, Aug 15, 2013 at 2:23 PM, Sriram Subramanian <
srsubramanian@linkedin.com> wrote:

> We need to first decide on the right behavior before optimizing on the
> implementation.
>
> Few key goals that I would put forward are -
>
> 1. Decoupling compression codec of the producer and the log
> 2. Ensuring message validity by the server on receiving bytes. This is
> done by the iterator today and this is important to ensure bad data does
> not creep in
> 3. Simple consumer implementation
> 4. Implementation that has good performance and efficiency
>
> With the above points in mind, I suggest we switch to Snappy as the
> default compression, optimize the code on the server end to avoid
> unnecessary copies and remove producer side compression completely except
> for cross DC sends.
>
> On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:
>
> >Here is a comment from Guozhong on this issue. He posted it on the
> >compression byte-copying issue, but it is really about not needing to do
> >compression. His suggestion is interesting though it ends up pushing more
> >complexity into consumers.
> >
> >Guozhang Wang commented on KAFKA-527:
> >-------------------------------------
> >
> >One alternative approach would be like this:
> >
> >Currently in the compression code (ByteBufferMessageSet.create), for each
> >message we write 1) the incrementing logical offset in LONG, 2) the
> >message
> >byte size in INT, and 3) the message payload.
> >
> >The idea is that since the logical offset is just incrementing, hence with
> >a compressed message, as long as we know the offset of the first message,
> >we would know the offset of the rest messages without even reading the
> >offset field.
> >
> >So we can ignore reading the offset of each message inside of the
> >compressed message but only the offset of the wrapper message which is the
> >offset of the last message + 1, and then in assignOffsets just modify the
> >offset of the wrapper message. Another change would be at the consumer
> >side, the iterator would need to be smart of interpreting the offsets of
> >messages while deep-iterating the compressed message.
> >
> >As Jay pointed out, this method would not work with log compaction since
> >it
> >would break the assumption that offsets increments continuously. Two
> >workarounds of this issue:
> >
> >1) In log compaction, instead of deleting the to-be-deleted-message just
> >setting its payload to null but keep its header and hence keeping its slot
> >in the incrementing offset.
> >2) During the compression process, instead of writing the absolute value
> >of
> >the logical offset of messages, write the deltas of their offset compared
> >with the offset of the wrapper message. So -1 would mean continuously
> >decrementing from the wrapper message offset, and -2/3/... would be
> >skipping holes in side the compressed message.
> >
> >
> >On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> >> Chris commented in another thread about the poor compression performance
> >> in 0.8, even with snappy.
> >>
> >> Indeed if I run the linear log write throughput test on my laptop I see
> >> 75MB/sec with no compression and 17MB/sec with snappy.
> >>
> >> This is a little surprising as snappy claims 200MB round-trip
> >>performance
> >> (compress + uncompress) from java. So what is going on?
> >>
> >> Well you may remember I actually filed a bug a while back on all the
> >> inefficient byte copying in the compression path (KAFKA-527). I didn't
> >> think too much of it, other than it is a bit sloppy, since after all
> >> computers are good at copying bytes, right?
> >>
> >> Turns out not so much, if you look at a profile of the standalone log
> >>test
> >> you see that with no compression 80% of the time goes to
> >>FileChannel.write,
> >> which is reasonable since that is what a log does.
> >>
> >> But with snappy enabled only 5% goes to writing data, 50% of the time
> >>goes
> >> to byte copying and allocation, and only about 22% goes to actual
> >> compression and decompression (with lots of misc stuff in their I
> >>haven't
> >> bothered to tally).
> >>
> >> If someone was to optimize this code path I think we could take a patch
> >>in
> >> 0.8.1. It shouldn't be too hard, just using the backing array on the
> >>byte
> >> buffer and avoiding all the input streams, output streams, byte array
> >> output streams, and intermediate message blobs.
> >>
> >> I summarized this along with how to reproduce the test results here:
> >> https://issues.apache.org/jira/browse/KAFKA-527
> >>
> >> -Jay
> >>
>
>

Re: compression performance

Posted by Chris Hogue <cs...@gmail.com>.
I would generally agree with the key goals you've suggested.

I'm just coming to this discussion after some recent testing with 0.8 so I
may be missing some background. The reference I found to this discussion is
the JIRA issue below. Please let me know if there are others things I
should look at.

https://issues.apache.org/jira/browse/KAFKA-595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

If I'm reading this correctly the reasoning behind removing compression
from the producer is that its benefit (network bandwidth saved) is
outweighed by the cost of the un-compress/re-compress on the broker. The
issue with the odd heuristic about which codec to use on the broker makes
sense.

However I think the implied assumption that the broker will always
un-compress and re-compress warrants discussion. This doesn't necessarily
have to be the case as the approach outlined in this thread suggests. And
if you remove that assumption you free up a lot of potential in the brokers.

While one way to look at this is "we're already doing it on the broker, why
do it on the producer", we came to it from the other angle, "we're already
doing it on the producer, why have the broker do it again". I can certainly
see cases where each would be appropriate.

As noted in other threads removing compression from the broker's
responsibility increased our throughput over 3x. This is still doing
compression on the producer app, just outside of the kafka API, so it still
benefits from the reduced network bandwidth the current built-in
compression has.

I appreciate the gains that can be had through optimizing the byte
management in that code path. That seems like a good path to go down for
the general case. But no matter how much you optimize it there's still
going to be a non-trivial cost on the broker.

So in an ideal world the Kafka APIs would have a built-in ability for us to
choose at an application level whether we want the compression load to be
on the producer or the broker. At a minimum I'm really hoping our ability
to do that ourselves doesn't go away, especially if we're willing to take
on the responsibility of batching/compressing. Said another way, we would
at least need the optimized code path for uncompressed messages in
ByteBufferMessageSet.assignOffsets() to stick around so that we can do it
on our own.

Thanks for all of the consideration here, it's a good discussion.


-Chris




On Thu, Aug 15, 2013 at 2:23 PM, Sriram Subramanian <
srsubramanian@linkedin.com> wrote:

> We need to first decide on the right behavior before optimizing on the
> implementation.
>
> Few key goals that I would put forward are -
>
> 1. Decoupling compression codec of the producer and the log
> 2. Ensuring message validity by the server on receiving bytes. This is
> done by the iterator today and this is important to ensure bad data does
> not creep in
> 3. Simple consumer implementation
> 4. Implementation that has good performance and efficiency
>
> With the above points in mind, I suggest we switch to Snappy as the
> default compression, optimize the code on the server end to avoid
> unnecessary copies and remove producer side compression completely except
> for cross DC sends.
>
> On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:
>
> >Here is a comment from Guozhong on this issue. He posted it on the
> >compression byte-copying issue, but it is really about not needing to do
> >compression. His suggestion is interesting though it ends up pushing more
> >complexity into consumers.
> >
> >Guozhang Wang commented on KAFKA-527:
> >-------------------------------------
> >
> >One alternative approach would be like this:
> >
> >Currently in the compression code (ByteBufferMessageSet.create), for each
> >message we write 1) the incrementing logical offset in LONG, 2) the
> >message
> >byte size in INT, and 3) the message payload.
> >
> >The idea is that since the logical offset is just incrementing, hence with
> >a compressed message, as long as we know the offset of the first message,
> >we would know the offset of the rest messages without even reading the
> >offset field.
> >
> >So we can ignore reading the offset of each message inside of the
> >compressed message but only the offset of the wrapper message which is the
> >offset of the last message + 1, and then in assignOffsets just modify the
> >offset of the wrapper message. Another change would be at the consumer
> >side, the iterator would need to be smart of interpreting the offsets of
> >messages while deep-iterating the compressed message.
> >
> >As Jay pointed out, this method would not work with log compaction since
> >it
> >would break the assumption that offsets increments continuously. Two
> >workarounds of this issue:
> >
> >1) In log compaction, instead of deleting the to-be-deleted-message just
> >setting its payload to null but keep its header and hence keeping its slot
> >in the incrementing offset.
> >2) During the compression process, instead of writing the absolute value
> >of
> >the logical offset of messages, write the deltas of their offset compared
> >with the offset of the wrapper message. So -1 would mean continuously
> >decrementing from the wrapper message offset, and -2/3/... would be
> >skipping holes in side the compressed message.
> >
> >
> >On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> >> Chris commented in another thread about the poor compression performance
> >> in 0.8, even with snappy.
> >>
> >> Indeed if I run the linear log write throughput test on my laptop I see
> >> 75MB/sec with no compression and 17MB/sec with snappy.
> >>
> >> This is a little surprising as snappy claims 200MB round-trip
> >>performance
> >> (compress + uncompress) from java. So what is going on?
> >>
> >> Well you may remember I actually filed a bug a while back on all the
> >> inefficient byte copying in the compression path (KAFKA-527). I didn't
> >> think too much of it, other than it is a bit sloppy, since after all
> >> computers are good at copying bytes, right?
> >>
> >> Turns out not so much, if you look at a profile of the standalone log
> >>test
> >> you see that with no compression 80% of the time goes to
> >>FileChannel.write,
> >> which is reasonable since that is what a log does.
> >>
> >> But with snappy enabled only 5% goes to writing data, 50% of the time
> >>goes
> >> to byte copying and allocation, and only about 22% goes to actual
> >> compression and decompression (with lots of misc stuff in their I
> >>haven't
> >> bothered to tally).
> >>
> >> If someone was to optimize this code path I think we could take a patch
> >>in
> >> 0.8.1. It shouldn't be too hard, just using the backing array on the
> >>byte
> >> buffer and avoiding all the input streams, output streams, byte array
> >> output streams, and intermediate message blobs.
> >>
> >> I summarized this along with how to reproduce the test results here:
> >> https://issues.apache.org/jira/browse/KAFKA-527
> >>
> >> -Jay
> >>
>
>

Re: compression performance

Posted by Sriram Subramanian <sr...@linkedin.com>.
We need to first decide on the right behavior before optimizing on the
implementation.

Few key goals that I would put forward are -

1. Decoupling compression codec of the producer and the log
2. Ensuring message validity by the server on receiving bytes. This is
done by the iterator today and this is important to ensure bad data does
not creep in
3. Simple consumer implementation
4. Implementation that has good performance and efficiency

With the above points in mind, I suggest we switch to Snappy as the
default compression, optimize the code on the server end to avoid
unnecessary copies and remove producer side compression completely except
for cross DC sends.

On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:

>Here is a comment from Guozhong on this issue. He posted it on the
>compression byte-copying issue, but it is really about not needing to do
>compression. His suggestion is interesting though it ends up pushing more
>complexity into consumers.
>
>Guozhang Wang commented on KAFKA-527:
>-------------------------------------
>
>One alternative approach would be like this:
>
>Currently in the compression code (ByteBufferMessageSet.create), for each
>message we write 1) the incrementing logical offset in LONG, 2) the
>message
>byte size in INT, and 3) the message payload.
>
>The idea is that since the logical offset is just incrementing, hence with
>a compressed message, as long as we know the offset of the first message,
>we would know the offset of the rest messages without even reading the
>offset field.
>
>So we can ignore reading the offset of each message inside of the
>compressed message but only the offset of the wrapper message which is the
>offset of the last message + 1, and then in assignOffsets just modify the
>offset of the wrapper message. Another change would be at the consumer
>side, the iterator would need to be smart of interpreting the offsets of
>messages while deep-iterating the compressed message.
>
>As Jay pointed out, this method would not work with log compaction since
>it
>would break the assumption that offsets increments continuously. Two
>workarounds of this issue:
>
>1) In log compaction, instead of deleting the to-be-deleted-message just
>setting its payload to null but keep its header and hence keeping its slot
>in the incrementing offset.
>2) During the compression process, instead of writing the absolute value
>of
>the logical offset of messages, write the deltas of their offset compared
>with the offset of the wrapper message. So -1 would mean continuously
>decrementing from the wrapper message offset, and -2/3/... would be
>skipping holes in side the compressed message.
>
>
>On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
>
>> Chris commented in another thread about the poor compression performance
>> in 0.8, even with snappy.
>>
>> Indeed if I run the linear log write throughput test on my laptop I see
>> 75MB/sec with no compression and 17MB/sec with snappy.
>>
>> This is a little surprising as snappy claims 200MB round-trip
>>performance
>> (compress + uncompress) from java. So what is going on?
>>
>> Well you may remember I actually filed a bug a while back on all the
>> inefficient byte copying in the compression path (KAFKA-527). I didn't
>> think too much of it, other than it is a bit sloppy, since after all
>> computers are good at copying bytes, right?
>>
>> Turns out not so much, if you look at a profile of the standalone log
>>test
>> you see that with no compression 80% of the time goes to
>>FileChannel.write,
>> which is reasonable since that is what a log does.
>>
>> But with snappy enabled only 5% goes to writing data, 50% of the time
>>goes
>> to byte copying and allocation, and only about 22% goes to actual
>> compression and decompression (with lots of misc stuff in their I
>>haven't
>> bothered to tally).
>>
>> If someone was to optimize this code path I think we could take a patch
>>in
>> 0.8.1. It shouldn't be too hard, just using the backing array on the
>>byte
>> buffer and avoiding all the input streams, output streams, byte array
>> output streams, and intermediate message blobs.
>>
>> I summarized this along with how to reproduce the test results here:
>> https://issues.apache.org/jira/browse/KAFKA-527
>>
>> -Jay
>>


Re: compression performance

Posted by Sriram Subramanian <sr...@linkedin.com>.
We need to first decide on the right behavior before optimizing on the
implementation.

Few key goals that I would put forward are -

1. Decoupling compression codec of the producer and the log
2. Ensuring message validity by the server on receiving bytes. This is
done by the iterator today and this is important to ensure bad data does
not creep in
3. Simple consumer implementation
4. Implementation that has good performance and efficiency

With the above points in mind, I suggest we switch to Snappy as the
default compression, optimize the code on the server end to avoid
unnecessary copies and remove producer side compression completely except
for cross DC sends.

On 8/15/13 11:28 AM, "Jay Kreps" <ja...@gmail.com> wrote:

>Here is a comment from Guozhong on this issue. He posted it on the
>compression byte-copying issue, but it is really about not needing to do
>compression. His suggestion is interesting though it ends up pushing more
>complexity into consumers.
>
>Guozhang Wang commented on KAFKA-527:
>-------------------------------------
>
>One alternative approach would be like this:
>
>Currently in the compression code (ByteBufferMessageSet.create), for each
>message we write 1) the incrementing logical offset in LONG, 2) the
>message
>byte size in INT, and 3) the message payload.
>
>The idea is that since the logical offset is just incrementing, hence with
>a compressed message, as long as we know the offset of the first message,
>we would know the offset of the rest messages without even reading the
>offset field.
>
>So we can ignore reading the offset of each message inside of the
>compressed message but only the offset of the wrapper message which is the
>offset of the last message + 1, and then in assignOffsets just modify the
>offset of the wrapper message. Another change would be at the consumer
>side, the iterator would need to be smart of interpreting the offsets of
>messages while deep-iterating the compressed message.
>
>As Jay pointed out, this method would not work with log compaction since
>it
>would break the assumption that offsets increments continuously. Two
>workarounds of this issue:
>
>1) In log compaction, instead of deleting the to-be-deleted-message just
>setting its payload to null but keep its header and hence keeping its slot
>in the incrementing offset.
>2) During the compression process, instead of writing the absolute value
>of
>the logical offset of messages, write the deltas of their offset compared
>with the offset of the wrapper message. So -1 would mean continuously
>decrementing from the wrapper message offset, and -2/3/... would be
>skipping holes in side the compressed message.
>
>
>On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:
>
>> Chris commented in another thread about the poor compression performance
>> in 0.8, even with snappy.
>>
>> Indeed if I run the linear log write throughput test on my laptop I see
>> 75MB/sec with no compression and 17MB/sec with snappy.
>>
>> This is a little surprising as snappy claims 200MB round-trip
>>performance
>> (compress + uncompress) from java. So what is going on?
>>
>> Well you may remember I actually filed a bug a while back on all the
>> inefficient byte copying in the compression path (KAFKA-527). I didn't
>> think too much of it, other than it is a bit sloppy, since after all
>> computers are good at copying bytes, right?
>>
>> Turns out not so much, if you look at a profile of the standalone log
>>test
>> you see that with no compression 80% of the time goes to
>>FileChannel.write,
>> which is reasonable since that is what a log does.
>>
>> But with snappy enabled only 5% goes to writing data, 50% of the time
>>goes
>> to byte copying and allocation, and only about 22% goes to actual
>> compression and decompression (with lots of misc stuff in their I
>>haven't
>> bothered to tally).
>>
>> If someone was to optimize this code path I think we could take a patch
>>in
>> 0.8.1. It shouldn't be too hard, just using the backing array on the
>>byte
>> buffer and avoiding all the input streams, output streams, byte array
>> output streams, and intermediate message blobs.
>>
>> I summarized this along with how to reproduce the test results here:
>> https://issues.apache.org/jira/browse/KAFKA-527
>>
>> -Jay
>>


Re: compression performance

Posted by Jay Kreps <ja...@gmail.com>.
Here is a comment from Guozhong on this issue. He posted it on the
compression byte-copying issue, but it is really about not needing to do
compression. His suggestion is interesting though it ends up pushing more
complexity into consumers.

Guozhang Wang commented on KAFKA-527:
-------------------------------------

One alternative approach would be like this:

Currently in the compression code (ByteBufferMessageSet.create), for each
message we write 1) the incrementing logical offset in LONG, 2) the message
byte size in INT, and 3) the message payload.

The idea is that since the logical offset is just incrementing, hence with
a compressed message, as long as we know the offset of the first message,
we would know the offset of the rest messages without even reading the
offset field.

So we can ignore reading the offset of each message inside of the
compressed message but only the offset of the wrapper message which is the
offset of the last message + 1, and then in assignOffsets just modify the
offset of the wrapper message. Another change would be at the consumer
side, the iterator would need to be smart of interpreting the offsets of
messages while deep-iterating the compressed message.

As Jay pointed out, this method would not work with log compaction since it
would break the assumption that offsets increments continuously. Two
workarounds of this issue:

1) In log compaction, instead of deleting the to-be-deleted-message just
setting its payload to null but keep its header and hence keeping its slot
in the incrementing offset.
2) During the compression process, instead of writing the absolute value of
the logical offset of messages, write the deltas of their offset compared
with the offset of the wrapper message. So -1 would mean continuously
decrementing from the wrapper message offset, and -2/3/... would be
skipping holes in side the compressed message.


On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:

> Chris commented in another thread about the poor compression performance
> in 0.8, even with snappy.
>
> Indeed if I run the linear log write throughput test on my laptop I see
> 75MB/sec with no compression and 17MB/sec with snappy.
>
> This is a little surprising as snappy claims 200MB round-trip performance
> (compress + uncompress) from java. So what is going on?
>
> Well you may remember I actually filed a bug a while back on all the
> inefficient byte copying in the compression path (KAFKA-527). I didn't
> think too much of it, other than it is a bit sloppy, since after all
> computers are good at copying bytes, right?
>
> Turns out not so much, if you look at a profile of the standalone log test
> you see that with no compression 80% of the time goes to FileChannel.write,
> which is reasonable since that is what a log does.
>
> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual
> compression and decompression (with lots of misc stuff in their I haven't
> bothered to tally).
>
> If someone was to optimize this code path I think we could take a patch in
> 0.8.1. It shouldn't be too hard, just using the backing array on the byte
> buffer and avoiding all the input streams, output streams, byte array
> output streams, and intermediate message blobs.
>
> I summarized this along with how to reproduce the test results here:
> https://issues.apache.org/jira/browse/KAFKA-527
>
> -Jay
>

Re: compression performance

Posted by Jay Kreps <ja...@gmail.com>.
Here is a comment from Guozhong on this issue. He posted it on the
compression byte-copying issue, but it is really about not needing to do
compression. His suggestion is interesting though it ends up pushing more
complexity into consumers.

Guozhang Wang commented on KAFKA-527:
-------------------------------------

One alternative approach would be like this:

Currently in the compression code (ByteBufferMessageSet.create), for each
message we write 1) the incrementing logical offset in LONG, 2) the message
byte size in INT, and 3) the message payload.

The idea is that since the logical offset is just incrementing, hence with
a compressed message, as long as we know the offset of the first message,
we would know the offset of the rest messages without even reading the
offset field.

So we can ignore reading the offset of each message inside of the
compressed message but only the offset of the wrapper message which is the
offset of the last message + 1, and then in assignOffsets just modify the
offset of the wrapper message. Another change would be at the consumer
side, the iterator would need to be smart of interpreting the offsets of
messages while deep-iterating the compressed message.

As Jay pointed out, this method would not work with log compaction since it
would break the assumption that offsets increments continuously. Two
workarounds of this issue:

1) In log compaction, instead of deleting the to-be-deleted-message just
setting its payload to null but keep its header and hence keeping its slot
in the incrementing offset.
2) During the compression process, instead of writing the absolute value of
the logical offset of messages, write the deltas of their offset compared
with the offset of the wrapper message. So -1 would mean continuously
decrementing from the wrapper message offset, and -2/3/... would be
skipping holes in side the compressed message.


On Fri, Aug 2, 2013 at 10:19 PM, Jay Kreps <ja...@gmail.com> wrote:

> Chris commented in another thread about the poor compression performance
> in 0.8, even with snappy.
>
> Indeed if I run the linear log write throughput test on my laptop I see
> 75MB/sec with no compression and 17MB/sec with snappy.
>
> This is a little surprising as snappy claims 200MB round-trip performance
> (compress + uncompress) from java. So what is going on?
>
> Well you may remember I actually filed a bug a while back on all the
> inefficient byte copying in the compression path (KAFKA-527). I didn't
> think too much of it, other than it is a bit sloppy, since after all
> computers are good at copying bytes, right?
>
> Turns out not so much, if you look at a profile of the standalone log test
> you see that with no compression 80% of the time goes to FileChannel.write,
> which is reasonable since that is what a log does.
>
> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual
> compression and decompression (with lots of misc stuff in their I haven't
> bothered to tally).
>
> If someone was to optimize this code path I think we could take a patch in
> 0.8.1. It shouldn't be too hard, just using the backing array on the byte
> buffer and avoiding all the input streams, output streams, byte array
> output streams, and intermediate message blobs.
>
> I summarized this along with how to reproduce the test results here:
> https://issues.apache.org/jira/browse/KAFKA-527
>
> -Jay
>

Re: compression performance

Posted by Jan Kotek <di...@kotek.net>.
> you see that with no compression 80% of the time goes to FileChannel.write,

> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual

I had similar problem with MapDB, it was solved by using memory mapped files. 
Not sure how it applies to this case.

Regards,
Jan Kotek


On Friday 02 August 2013 22:19:34 Jay Kreps wrote:
> Chris commented in another thread about the poor compression performance in
> 0.8, even with snappy.
> 
> Indeed if I run the linear log write throughput test on my laptop I see
> 75MB/sec with no compression and 17MB/sec with snappy.
> 
> This is a little surprising as snappy claims 200MB round-trip performance
> (compress + uncompress) from java. So what is going on?
> 
> Well you may remember I actually filed a bug a while back on all the
> inefficient byte copying in the compression path (KAFKA-527). I didn't
> think too much of it, other than it is a bit sloppy, since after all
> computers are good at copying bytes, right?
> 
> Turns out not so much, if you look at a profile of the standalone log test
> you see that with no compression 80% of the time goes to FileChannel.write,
> which is reasonable since that is what a log does.
> 
> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual
> compression and decompression (with lots of misc stuff in their I haven't
> bothered to tally).
> 
> If someone was to optimize this code path I think we could take a patch in
> 0.8.1. It shouldn't be too hard, just using the backing array on the byte
> buffer and avoiding all the input streams, output streams, byte array
> output streams, and intermediate message blobs.
> 
> I summarized this along with how to reproduce the test results here:
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> -Jay

Re: compression performance

Posted by Jan Kotek <di...@kotek.net>.
> you see that with no compression 80% of the time goes to FileChannel.write,

> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual

I had similar problem with MapDB, it was solved by using memory mapped files. 
Not sure how it applies to this case.

Regards,
Jan Kotek


On Friday 02 August 2013 22:19:34 Jay Kreps wrote:
> Chris commented in another thread about the poor compression performance in
> 0.8, even with snappy.
> 
> Indeed if I run the linear log write throughput test on my laptop I see
> 75MB/sec with no compression and 17MB/sec with snappy.
> 
> This is a little surprising as snappy claims 200MB round-trip performance
> (compress + uncompress) from java. So what is going on?
> 
> Well you may remember I actually filed a bug a while back on all the
> inefficient byte copying in the compression path (KAFKA-527). I didn't
> think too much of it, other than it is a bit sloppy, since after all
> computers are good at copying bytes, right?
> 
> Turns out not so much, if you look at a profile of the standalone log test
> you see that with no compression 80% of the time goes to FileChannel.write,
> which is reasonable since that is what a log does.
> 
> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual
> compression and decompression (with lots of misc stuff in their I haven't
> bothered to tally).
> 
> If someone was to optimize this code path I think we could take a patch in
> 0.8.1. It shouldn't be too hard, just using the backing array on the byte
> buffer and avoiding all the input streams, output streams, byte array
> output streams, and intermediate message blobs.
> 
> I summarized this along with how to reproduce the test results here:
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> -Jay