You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lorenzo Alberton (JIRA)" <ji...@apache.org> on 2012/07/18 19:39:33 UTC

[jira] [Created] (KAFKA-406) Gzipped payload is a full wrapped Message (with headers), not just payload

Lorenzo Alberton created KAFKA-406:
--------------------------------------

             Summary: Gzipped payload is a full wrapped Message (with headers), not just payload
                 Key: KAFKA-406
                 URL: https://issues.apache.org/jira/browse/KAFKA-406
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.7.1
         Environment: N/A
            Reporter: Lorenzo Alberton


When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].

In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 

Is this double-wrapping the intended behaviour? 


[1] messages.foreach(m => m.serializeTo(messageByteBuffer))

[2] new Message(outputStream.toByteArray, compressionCodec) 



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Re: [jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by Michal Hariš <mi...@gmail.com>.
Well then I'm not sure if I follow the re-compression line of reasoning
behind
the double wrapping.. but I'm only starting to look into details of Kafka so
maybe there's another layer of compression that lumps messages together
for saving i/o between log or within cluster.. anyways as for the double
wrapping
would be really good if it was documented where the wire format is
described
as a note or something like that.

On Thu, Jul 19, 2012 at 7:04 PM, Jay Kreps <ja...@gmail.com> wrote:

> Currently we always retain the data in the format that it was produced. It
> would be a fairly small change, though, to allow a server-override
> parameter that would have the server either always compress the data or
> always decompress it before writing it to the log.
>
> -Jay
>
> On Thu, Jul 19, 2012 at 10:24 AM, Michal Hariš <michal.harish@gmail.com
> >wrote:
>
> > Does it mean that currently if a producer publishes an uncompressed
> message
> > to the server which has local log format configured to compressed,
> > consumers will receive compressed messages when fetching?
> >  On Jul 19, 2012 5:08 PM, "Jay Kreps (JIRA)" <ji...@apache.org> wrote:
> >
> > >
> > >     [
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418395#comment-13418395
> > ]
> > >
> > > Jay Kreps commented on KAFKA-406:
> > > ---------------------------------
> > >
> > > Oh yes, and the other design requirement we had was that messages not
> be
> > > re-compressed on a fetch request. A simple implementation that didn't
> > have
> > > this requirement would just be to have the consumer request N messages,
> > and
> > > either specify to compress or not, and have the server read these into
> > > memory, decompress if its local log format is comrpessed, and then
> batch
> > > compress exactly the messages the client asked for, and send just that.
> > The
> > > problem with this is that we have about a 5x read-to-write ratio so
> > > recompressing on each read is now recompressing the same stuff 5 times
> on
> > > average. This makes consumption way more expensive. I don't think this
> > is a
> > > hard requirement but to make that approach fly we would have to
> > demonstrate
> > > that the cpu overhead of compression would not become a serious
> > bottleneck.
> > > I know this won't work with GZIP, but it might be possible to do it
> with
> > > snappy or a faster compression algo.
> > >
> > > > Gzipped payload is a fully wrapped Message (with headers), not just
> > > payload
> > > >
> > >
> >
> ---------------------------------------------------------------------------
> > > >
> > > >                 Key: KAFKA-406
> > > >                 URL: https://issues.apache.org/jira/browse/KAFKA-406
> > > >             Project: Kafka
> > > >          Issue Type: Bug
> > > >          Components: core
> > > >    Affects Versions: 0.7.1
> > > >         Environment: N/A
> > > >            Reporter: Lorenzo Alberton
> > > >
> > > > When creating a gzipped MessageSet, the collection of Messages is
> > passed
> > > to CompressionUtils.compress(), where each message is serialised [1]
> > into a
> > > buffer (not just the payload, the full Message with headers,
> > uncompressed),
> > > then gripped, and finally wrapped into another Message [2].
> > > > In other words, the consumer has to unwrap the Message flagged as
> > > gzipped, unzip the payload, and unwrap the unzipped payload again as a
> > > non-compressed Message.
> > > > Is this double-wrapping the intended behaviour?
> > > > [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> > > > [2] new Message(outputStream.toByteArray, compressionCodec)
> > >
> > > --
> > > This message is automatically generated by JIRA.
> > > If you think it was sent incorrectly, please contact your JIRA
> > > administrators:
> > >
> https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
> > > For more information on JIRA, see:
> > http://www.atlassian.com/software/jira
> > >
> > >
> > >
> >
>

Re: [jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by Jay Kreps <ja...@gmail.com>.
Currently we always retain the data in the format that it was produced. It
would be a fairly small change, though, to allow a server-override
parameter that would have the server either always compress the data or
always decompress it before writing it to the log.

-Jay

On Thu, Jul 19, 2012 at 10:24 AM, Michal Hariš <mi...@gmail.com>wrote:

> Does it mean that currently if a producer publishes an uncompressed message
> to the server which has local log format configured to compressed,
> consumers will receive compressed messages when fetching?
>  On Jul 19, 2012 5:08 PM, "Jay Kreps (JIRA)" <ji...@apache.org> wrote:
>
> >
> >     [
> >
> https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418395#comment-13418395
> ]
> >
> > Jay Kreps commented on KAFKA-406:
> > ---------------------------------
> >
> > Oh yes, and the other design requirement we had was that messages not be
> > re-compressed on a fetch request. A simple implementation that didn't
> have
> > this requirement would just be to have the consumer request N messages,
> and
> > either specify to compress or not, and have the server read these into
> > memory, decompress if its local log format is comrpessed, and then batch
> > compress exactly the messages the client asked for, and send just that.
> The
> > problem with this is that we have about a 5x read-to-write ratio so
> > recompressing on each read is now recompressing the same stuff 5 times on
> > average. This makes consumption way more expensive. I don't think this
> is a
> > hard requirement but to make that approach fly we would have to
> demonstrate
> > that the cpu overhead of compression would not become a serious
> bottleneck.
> > I know this won't work with GZIP, but it might be possible to do it with
> > snappy or a faster compression algo.
> >
> > > Gzipped payload is a fully wrapped Message (with headers), not just
> > payload
> > >
> >
> ---------------------------------------------------------------------------
> > >
> > >                 Key: KAFKA-406
> > >                 URL: https://issues.apache.org/jira/browse/KAFKA-406
> > >             Project: Kafka
> > >          Issue Type: Bug
> > >          Components: core
> > >    Affects Versions: 0.7.1
> > >         Environment: N/A
> > >            Reporter: Lorenzo Alberton
> > >
> > > When creating a gzipped MessageSet, the collection of Messages is
> passed
> > to CompressionUtils.compress(), where each message is serialised [1]
> into a
> > buffer (not just the payload, the full Message with headers,
> uncompressed),
> > then gripped, and finally wrapped into another Message [2].
> > > In other words, the consumer has to unwrap the Message flagged as
> > gzipped, unzip the payload, and unwrap the unzipped payload again as a
> > non-compressed Message.
> > > Is this double-wrapping the intended behaviour?
> > > [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> > > [2] new Message(outputStream.toByteArray, compressionCodec)
> >
> > --
> > This message is automatically generated by JIRA.
> > If you think it was sent incorrectly, please contact your JIRA
> > administrators:
> > https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
> > For more information on JIRA, see:
> http://www.atlassian.com/software/jira
> >
> >
> >
>

Re: [jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by Michal Hariš <mi...@gmail.com>.
Does it mean that currently if a producer publishes an uncompressed message
to the server which has local log format configured to compressed,
consumers will receive compressed messages when fetching?
 On Jul 19, 2012 5:08 PM, "Jay Kreps (JIRA)" <ji...@apache.org> wrote:

>
>     [
> https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418395#comment-13418395]
>
> Jay Kreps commented on KAFKA-406:
> ---------------------------------
>
> Oh yes, and the other design requirement we had was that messages not be
> re-compressed on a fetch request. A simple implementation that didn't have
> this requirement would just be to have the consumer request N messages, and
> either specify to compress or not, and have the server read these into
> memory, decompress if its local log format is comrpessed, and then batch
> compress exactly the messages the client asked for, and send just that. The
> problem with this is that we have about a 5x read-to-write ratio so
> recompressing on each read is now recompressing the same stuff 5 times on
> average. This makes consumption way more expensive. I don't think this is a
> hard requirement but to make that approach fly we would have to demonstrate
> that the cpu overhead of compression would not become a serious bottleneck.
> I know this won't work with GZIP, but it might be possible to do it with
> snappy or a faster compression algo.
>
> > Gzipped payload is a fully wrapped Message (with headers), not just
> payload
> >
> ---------------------------------------------------------------------------
> >
> >                 Key: KAFKA-406
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-406
> >             Project: Kafka
> >          Issue Type: Bug
> >          Components: core
> >    Affects Versions: 0.7.1
> >         Environment: N/A
> >            Reporter: Lorenzo Alberton
> >
> > When creating a gzipped MessageSet, the collection of Messages is passed
> to CompressionUtils.compress(), where each message is serialised [1] into a
> buffer (not just the payload, the full Message with headers, uncompressed),
> then gripped, and finally wrapped into another Message [2].
> > In other words, the consumer has to unwrap the Message flagged as
> gzipped, unzip the payload, and unwrap the unzipped payload again as a
> non-compressed Message.
> > Is this double-wrapping the intended behaviour?
> > [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> > [2] new Message(outputStream.toByteArray, compressionCodec)
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA
> administrators:
> https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
> For more information on JIRA, see: http://www.atlassian.com/software/jira
>
>
>

Re: [jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by Michal Hariš <mi...@gmail.com>.
I can confirm, it has bitten me few days ago, had it been documented I
wouldn't need to spend an hour looking into the scala code which serializes
the gzipped message only to find out theres another message inside when
writing a php client.

I also think it does actually complicate the matter because the code which
is supposed to be handling payload has to treat compressed payload as
message again but only for compressed ones, and same when producing, let
alone the potential for the compression flag slipping to 1 and wrapping
messages infinitely..
On Jul 18, 2012 7:55 PM, "Lorenzo Alberton (JIRA)" <ji...@apache.org> wrote:

>
>     [
> https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13417367#comment-13417367]
>
> Lorenzo Alberton commented on KAFKA-406:
> ----------------------------------------
>
> I see. It needs to be documented somewhere though, as people writing
> client libs might have trouble reverse-engineering the protocol. The fact
> that only gzipped messages are double-wrapped (requiring different handling
> depending on a flag) can be confusing.
>
> This might be a stupid question, but if a Message can contain a MessageSet
> with more than one Message, how is the consumer supposed to iterate through
> them? Children of a MessageSet might be Message objects OR MessageSet
> objects?
>
> > Gzipped payload is a fully wrapped Message (with headers), not just
> payload
> >
> ---------------------------------------------------------------------------
> >
> >                 Key: KAFKA-406
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-406
> >             Project: Kafka
> >          Issue Type: Bug
> >          Components: core
> >    Affects Versions: 0.7.1
> >         Environment: N/A
> >            Reporter: Lorenzo Alberton
> >
> > When creating a gzipped MessageSet, the collection of Messages is passed
> to CompressionUtils.compress(), where each message is serialised [1] into a
> buffer (not just the payload, the full Message with headers, uncompressed),
> then gripped, and finally wrapped into another Message [2].
> > In other words, the consumer has to unwrap the Message flagged as
> gzipped, unzip the payload, and unwrap the unzipped payload again as a
> non-compressed Message.
> > Is this double-wrapping the intended behaviour?
> > [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> > [2] new Message(outputStream.toByteArray, compressionCodec)
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA
> administrators:
> https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
> For more information on JIRA, see: http://www.atlassian.com/software/jira
>
>
>

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418376#comment-13418376 ] 

Jay Kreps commented on KAFKA-406:
---------------------------------

I actually don't see how the compression flag can slip to 1 unnoticed in a checksummed message sent over tcp. I mean you could incorrectly set it to 1, but you could incorrectly set all kinds of things in a client implementation including the message contents. Not sure if I am missing something...

I actually share your distaste for some of the details of the compression implementation. But I think batch compression is fundamentally an invasive feature for a message-at-a-time system, so I am not sure if we can do better. I think we would be open to hearing alternative approaches, if fully thought through, though it would likely be a big change.

Here are the basic requirements from my point of view:
1. It must be batch compression. Single message compression doesn't buy much for concise serialization formats and in any case can be implemented in the client
2. Compression must be maintained both in the on-disk format and the network. On disk, compression effectively triples the effective per-node cache size in our usage. In our usage we always produce to a local cluster and that then replicates cross datacenter. So for us the consumer is what really must be compressed over the network. However I am not sure that that is a universal design so ideally both producer and consumer should allow compression, although I think it is okay if the server decompresses and re-compresses in a different batch size.
3. It should not break the message-at-a-time API.

The alternative we considered was a paged log (e.g. stuffing messages into fixed-size pages). I am not sure if this is better or worse but we ended up rejecting it due to the complexity of implementation which would require splitting messages over pages on overflow etc.

                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Lorenzo Alberton (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13423216#comment-13423216 ] 

Lorenzo Alberton commented on KAFKA-406:
----------------------------------------

Hi Jay, thanks for your comments. When I wrote that the compression flag could "slip" to 1 of course I meant there's nothing in the code to prevent nested levels of compressed messages, as Jun noted. I'd add some checks to prevent this at write time.

I agree batch compression with a message-at-a-time system can't play nice with each other, so I don't have a better design to propose at this time. 
The only alternative I could see would be to disallow mixed compressed and non-compressed messages in the same topic, and having two different message set iterators. 

Anyway, for now I completely revamped the PHP client library to work with the current design (tested against code in trunk), @see KAFKA-419, so at least we can proceed :-).
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13420283#comment-13420283 ] 

Jun Rao commented on KAFKA-406:
-------------------------------

The current code actually supports arbitrary levels of recursion of a message (i.e., message with compressed messages, each of which has compressed messages, etc). I'd agree that this is not a very useful feature and is potentially dangerous. We can probably add some code to limit the recursion to 2 levels. 
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13417304#comment-13417304 ] 

Neha Narkhede commented on KAFKA-406:
-------------------------------------

Yes, this was intentional. The idea was to represent a compressed set of message just like a kafka Message. For this, we compress several messages into an array of bytes and treat it as the payload of a Message with compression codec GZIP or Snappy. This makes it easier to reason about compressed messages, since everything is a Message with a byte in its header telling us whether it is compressed or not.
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Lorenzo Alberton (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lorenzo Alberton updated KAFKA-406:
-----------------------------------

    Summary: Gzipped payload is a fully wrapped Message (with headers), not just payload  (was: Gzipped payload is a full wrapped Message (with headers), not just payload)
    
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Lorenzo Alberton (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13417367#comment-13417367 ] 

Lorenzo Alberton commented on KAFKA-406:
----------------------------------------

I see. It needs to be documented somewhere though, as people writing client libs might have trouble reverse-engineering the protocol. The fact that only gzipped messages are double-wrapped (requiring different handling depending on a flag) can be confusing.

This might be a stupid question, but if a Message can contain a MessageSet with more than one Message, how is the consumer supposed to iterate through them? Children of a MessageSet might be Message objects OR MessageSet objects?
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Lorenzo Alberton (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418163#comment-13418163 ] 

Lorenzo Alberton commented on KAFKA-406:
----------------------------------------

After sleeping over this, I think it's a really bad design decision. I appreciate that gzipping multiple messages together can lead to significant space savings, but I'm not convinced this is the right way. Since a compressed message can contain a *collection* of messages, the symmetry with the non-compressed message interface is broken, and a linear log is turned into an odd tree structure. This can't even be classified as normal iterator polymorphism. 

Other three very good reasons to rethink this design decision:

- as Michal also noted on the kafka-dev mailing list [1], the compression flag of a child of a compressed message could easily slip to 1, leading to endless recursion calls.

- the collection within a compressed message can't be partially consumed, i.e. you can't save the offset within the inner collection, as it would result in an invalid offset for the kafka log. The inner collection has to be consumed as a whole and the offset needs to be advanced to the next Message in the outer collection, breaking another important Kafka property.

- even if we only allow one single message (instead of a collection) as compressed payload of an outer Message, I don't see the need for the extra wrapping: the outer message has a CRC to verify that the gzipped payload is valid, and gzip itself has a CRC on the content, no need to have a 3rd CRC on the uncompressed message (waste of space and CPU).

Thoughts?

Best,
-- 
Lorenzo Alberton
Chief Tech Architect
DataSift, Inc.


[1] http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201207.mbox/%3CCAP5ZrEiDjUyhYuNpmh7Xck1dzdCROG_pEgdKbZdDV2yXrXxQAg%40mail.gmail.com%3Ec
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-406) Gzipped payload is a fully wrapped Message (with headers), not just payload

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13418395#comment-13418395 ] 

Jay Kreps commented on KAFKA-406:
---------------------------------

Oh yes, and the other design requirement we had was that messages not be re-compressed on a fetch request. A simple implementation that didn't have this requirement would just be to have the consumer request N messages, and either specify to compress or not, and have the server read these into memory, decompress if its local log format is comrpessed, and then batch compress exactly the messages the client asked for, and send just that. The problem with this is that we have about a 5x read-to-write ratio so recompressing on each read is now recompressing the same stuff 5 times on average. This makes consumption way more expensive. I don't think this is a hard requirement but to make that approach fly we would have to demonstrate that the cpu overhead of compression would not become a serious bottleneck. I know this won't work with GZIP, but it might be possible to do it with snappy or a faster compression algo.
                
> Gzipped payload is a fully wrapped Message (with headers), not just payload
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-406
>                 URL: https://issues.apache.org/jira/browse/KAFKA-406
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7.1
>         Environment: N/A
>            Reporter: Lorenzo Alberton
>
> When creating a gzipped MessageSet, the collection of Messages is passed to CompressionUtils.compress(), where each message is serialised [1] into a buffer (not just the payload, the full Message with headers, uncompressed), then gripped, and finally wrapped into another Message [2].
> In other words, the consumer has to unwrap the Message flagged as gzipped, unzip the payload, and unwrap the unzipped payload again as a non-compressed Message. 
> Is this double-wrapping the intended behaviour? 
> [1] messages.foreach(m => m.serializeTo(messageByteBuffer))
> [2] new Message(outputStream.toByteArray, compressionCodec) 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira