You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ross Black <ro...@gmail.com> on 2012/06/01 05:05:24 UTC

Exactly-once semantics with compression

Hi,

Using SimpleConsumer, I get the offset of a message (from MessageAndOffset)
and persist it with my consumer data to get exactly-once semantics for
consumer state (as described in the kafka design docs).  If the consumer
fails then it is simply a matter of starting replay of messages from the
persisted index.

When using compression, the offset from MessageAndOffset appears to be the
offset of the compressed batch.  e.g. For a batch of 10 messages, the
offset returned for messages 1-9 is the start of the *current* batch, and
the offset for message 10 is the start of the *next* batch.

How can I get the exactly-once semantics for consumer state?
Is there a way that I can get a batch of messages from SimpleConsumer?
(otherwise I have to reconstruct a batch by watching for a change in the
offset between messages)

Thanks,
Ross

Re: Exactly-once semantics with compression

Posted by Jun Rao <ju...@gmail.com>.
Since one could do everything on the producer side using Producer, it will
likely be the only public api for the producer. On the consumer side, since
the high level consumer can't do everything, we may need to support
SimpleConsumer as a public api, although the exact api could change in 0.8.

Thanks,

Jun

On Sun, Jun 3, 2012 at 5:00 AM, Ross Black <ro...@gmail.com> wrote:

> Hi Jun,
>
> Thanks.  I will stick with using the deep iterator then to avoid any
> internal changes.
>
> Are you able to comment on
>
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201205.mbox/%3CCAM%2BbZhhjGSDuR9_4-rgbTx3tZ4B%2BHscjX%2B6STXp9kLZUVnj0PQ%40mail.gmail.com%3E
> ?
>
> In particular I just wanted to check whether SyncProducer, AsyncProducer,
> and SimpleConsumer and considered part of the "public" API so that they do
> not disappear?
>
> Thanks,
> Ross
>
>
> On 3 June 2012 02:19, Jun Rao <ju...@gmail.com> wrote:
>
> > You can get the same offset using deep iterator. Whenever the offset
> > increases, you know you have crossed the compressed unit.
> >
> > Jun
> >
> > On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ro...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > The only reason I would like the compressed messages exposed is so
> that I
> > > know the boundary to be able to safely persist my state with the
> offset.
> > > Is there a better way to achieve that?
> > > In my (probably poor) example attempt to expose batch messages, the
> only
> > > things you can do with a compressed message set are - get the offset,
> get
> > > the serialized form, and iterate over the contained messages.
> > >
> > > Is kafka attempting to support exactly-once semantics? If so, it would
> > seem
> > > that something needs to be exposed in the API to make it a bit more
> > > explicit than having to keep track of offsets changing for individual
> > > messages.
> > >
> > > Thanks,
> > > Ross
> > >
> > >
> > >
> > > On 2 June 2012 14:19, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Ross,
> > > >
> > > > The shallow iterator is intended for efficient mirroring btw kafka
> > > > clusters. Not sure if it's a good idea to expose it as an external
> api.
> > > > Note that you can really can't do much on a compressed message set
> > other
> > > > than store it as raw bytes somewhere else.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.black@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Jun.
> > > > >
> > > > > I did find a way to process by batch, but it probably reaches a
> > little
> > > > too
> > > > > deep into the internals of kafka?
> > > > >
> > > > >            FetchRequest request = new FetchRequest("topic",
> > > > > partitionNumber, requestOffset, bufferSize);
> > > > >            ByteBufferMessageSet messageSet =
> > > > simpleConsumer.fetch(request);
> > > > >            Iterator<MessageAndOffset> batchIterator =
> > > > > messageSet.underlying().shallowIterator();
> > > > >            while (batchIterator.hasNext()) {
> > > > >                MessageAndOffset messageAndOffset =
> > > batchIterator.next();
> > > > >                Message batchMessage = messageAndOffset.message();
> > > > >                long offset = messageAndOffset.offset();
> > > > >                Iterator<MessageAndOffset> messages =
> > > > > CompressionUtils.decompress(batchMessage).iterator();
> > > > >                // process the batch of messages and persist with
> the
> > > > offset
> > > > >            }
> > > > >
> > > > > This should work ok, but I am concerned that it is using internal
> > kafka
> > > > > classes.  The code has to reach into the underlying (scala)
> > > > > ByteBufferMessageSet because shallowIterator is not exposed by the
> > java
> > > > > variant.  The code also has to understand that the message is
> > > potentially
> > > > > compressed and then call CompressionUtils.
> > > > >
> > > > > How likely is the above approach to work with subsequent releases?
> > > > > Is it worth exposing the concept of batches in ByteBufferMessageSet
> > to
> > > > make
> > > > > it explicit?
> > > > >
> > > > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > > > where BatchMessage is a simple extension of Message that has an
> > > > additional
> > > > > method to allow getting a ByteBufferMessageSet (ie. wraps the call
> to
> > > > > CompressionUtils).
> > > > >
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Thanks,
> > > > > Ross
> > > > >
> > > > >
> > > > >
> > > > > On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > Ross,
> > > > > >
> > > > > > With compression enabled, it's a bit hard to implement exact-once
> > > since
> > > > > > offsets are only advanced after a compressed batch of messages
> has
> > > been
> > > > > > consumed. So, you will have to make sure that each batch of
> > messages
> > > > can
> > > > > be
> > > > > > consumed together as a unit. The other option is to compress
> with a
> > > > batch
> > > > > > size of 1.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <
> > ross.w.black@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > > > MessageAndOffset)
> > > > > > > and persist it with my consumer data to get exactly-once
> > semantics
> > > > for
> > > > > > > consumer state (as described in the kafka design docs).  If the
> > > > > consumer
> > > > > > > fails then it is simply a matter of starting replay of messages
> > > from
> > > > > the
> > > > > > > persisted index.
> > > > > > >
> > > > > > > When using compression, the offset from MessageAndOffset
> appears
> > to
> > > > be
> > > > > > the
> > > > > > > offset of the compressed batch.  e.g. For a batch of 10
> messages,
> > > the
> > > > > > > offset returned for messages 1-9 is the start of the *current*
> > > batch,
> > > > > and
> > > > > > > the offset for message 10 is the start of the *next* batch.
> > > > > > >
> > > > > > > How can I get the exactly-once semantics for consumer state?
> > > > > > > Is there a way that I can get a batch of messages from
> > > > SimpleConsumer?
> > > > > > > (otherwise I have to reconstruct a batch by watching for a
> change
> > > in
> > > > > the
> > > > > > > offset between messages)
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Ross
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Exactly-once semantics with compression

Posted by Ross Black <ro...@gmail.com>.
Hi Jun,

Thanks.  I will stick with using the deep iterator then to avoid any
internal changes.

Are you able to comment on
http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201205.mbox/%3CCAM%2BbZhhjGSDuR9_4-rgbTx3tZ4B%2BHscjX%2B6STXp9kLZUVnj0PQ%40mail.gmail.com%3E?

In particular I just wanted to check whether SyncProducer, AsyncProducer,
and SimpleConsumer and considered part of the "public" API so that they do
not disappear?

Thanks,
Ross


On 3 June 2012 02:19, Jun Rao <ju...@gmail.com> wrote:

> You can get the same offset using deep iterator. Whenever the offset
> increases, you know you have crossed the compressed unit.
>
> Jun
>
> On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ro...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > The only reason I would like the compressed messages exposed is so that I
> > know the boundary to be able to safely persist my state with the offset.
> > Is there a better way to achieve that?
> > In my (probably poor) example attempt to expose batch messages, the only
> > things you can do with a compressed message set are - get the offset, get
> > the serialized form, and iterate over the contained messages.
> >
> > Is kafka attempting to support exactly-once semantics? If so, it would
> seem
> > that something needs to be exposed in the API to make it a bit more
> > explicit than having to keep track of offsets changing for individual
> > messages.
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 2 June 2012 14:19, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Ross,
> > >
> > > The shallow iterator is intended for efficient mirroring btw kafka
> > > clusters. Not sure if it's a good idea to expose it as an external api.
> > > Note that you can really can't do much on a compressed message set
> other
> > > than store it as raw bytes somewhere else.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ro...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun.
> > > >
> > > > I did find a way to process by batch, but it probably reaches a
> little
> > > too
> > > > deep into the internals of kafka?
> > > >
> > > >            FetchRequest request = new FetchRequest("topic",
> > > > partitionNumber, requestOffset, bufferSize);
> > > >            ByteBufferMessageSet messageSet =
> > > simpleConsumer.fetch(request);
> > > >            Iterator<MessageAndOffset> batchIterator =
> > > > messageSet.underlying().shallowIterator();
> > > >            while (batchIterator.hasNext()) {
> > > >                MessageAndOffset messageAndOffset =
> > batchIterator.next();
> > > >                Message batchMessage = messageAndOffset.message();
> > > >                long offset = messageAndOffset.offset();
> > > >                Iterator<MessageAndOffset> messages =
> > > > CompressionUtils.decompress(batchMessage).iterator();
> > > >                // process the batch of messages and persist with the
> > > offset
> > > >            }
> > > >
> > > > This should work ok, but I am concerned that it is using internal
> kafka
> > > > classes.  The code has to reach into the underlying (scala)
> > > > ByteBufferMessageSet because shallowIterator is not exposed by the
> java
> > > > variant.  The code also has to understand that the message is
> > potentially
> > > > compressed and then call CompressionUtils.
> > > >
> > > > How likely is the above approach to work with subsequent releases?
> > > > Is it worth exposing the concept of batches in ByteBufferMessageSet
> to
> > > make
> > > > it explicit?
> > > >
> > > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > > where BatchMessage is a simple extension of Message that has an
> > > additional
> > > > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > > > CompressionUtils).
> > > >
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Ross
> > > >
> > > >
> > > >
> > > > On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Ross,
> > > > >
> > > > > With compression enabled, it's a bit hard to implement exact-once
> > since
> > > > > offsets are only advanced after a compressed batch of messages has
> > been
> > > > > consumed. So, you will have to make sure that each batch of
> messages
> > > can
> > > > be
> > > > > consumed together as a unit. The other option is to compress with a
> > > batch
> > > > > size of 1.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <
> ross.w.black@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > > MessageAndOffset)
> > > > > > and persist it with my consumer data to get exactly-once
> semantics
> > > for
> > > > > > consumer state (as described in the kafka design docs).  If the
> > > > consumer
> > > > > > fails then it is simply a matter of starting replay of messages
> > from
> > > > the
> > > > > > persisted index.
> > > > > >
> > > > > > When using compression, the offset from MessageAndOffset appears
> to
> > > be
> > > > > the
> > > > > > offset of the compressed batch.  e.g. For a batch of 10 messages,
> > the
> > > > > > offset returned for messages 1-9 is the start of the *current*
> > batch,
> > > > and
> > > > > > the offset for message 10 is the start of the *next* batch.
> > > > > >
> > > > > > How can I get the exactly-once semantics for consumer state?
> > > > > > Is there a way that I can get a batch of messages from
> > > SimpleConsumer?
> > > > > > (otherwise I have to reconstruct a batch by watching for a change
> > in
> > > > the
> > > > > > offset between messages)
> > > > > >
> > > > > > Thanks,
> > > > > > Ross
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Exactly-once semantics with compression

Posted by Jun Rao <ju...@gmail.com>.
You can get the same offset using deep iterator. Whenever the offset
increases, you know you have crossed the compressed unit.

Jun

On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ro...@gmail.com> wrote:

> Hi Jun,
>
> The only reason I would like the compressed messages exposed is so that I
> know the boundary to be able to safely persist my state with the offset.
> Is there a better way to achieve that?
> In my (probably poor) example attempt to expose batch messages, the only
> things you can do with a compressed message set are - get the offset, get
> the serialized form, and iterate over the contained messages.
>
> Is kafka attempting to support exactly-once semantics? If so, it would seem
> that something needs to be exposed in the API to make it a bit more
> explicit than having to keep track of offsets changing for individual
> messages.
>
> Thanks,
> Ross
>
>
>
> On 2 June 2012 14:19, Jun Rao <ju...@gmail.com> wrote:
>
> > Ross,
> >
> > The shallow iterator is intended for efficient mirroring btw kafka
> > clusters. Not sure if it's a good idea to expose it as an external api.
> > Note that you can really can't do much on a compressed message set other
> > than store it as raw bytes somewhere else.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ro...@gmail.com>
> > wrote:
> >
> > > Hi Jun.
> > >
> > > I did find a way to process by batch, but it probably reaches a little
> > too
> > > deep into the internals of kafka?
> > >
> > >            FetchRequest request = new FetchRequest("topic",
> > > partitionNumber, requestOffset, bufferSize);
> > >            ByteBufferMessageSet messageSet =
> > simpleConsumer.fetch(request);
> > >            Iterator<MessageAndOffset> batchIterator =
> > > messageSet.underlying().shallowIterator();
> > >            while (batchIterator.hasNext()) {
> > >                MessageAndOffset messageAndOffset =
> batchIterator.next();
> > >                Message batchMessage = messageAndOffset.message();
> > >                long offset = messageAndOffset.offset();
> > >                Iterator<MessageAndOffset> messages =
> > > CompressionUtils.decompress(batchMessage).iterator();
> > >                // process the batch of messages and persist with the
> > offset
> > >            }
> > >
> > > This should work ok, but I am concerned that it is using internal kafka
> > > classes.  The code has to reach into the underlying (scala)
> > > ByteBufferMessageSet because shallowIterator is not exposed by the java
> > > variant.  The code also has to understand that the message is
> potentially
> > > compressed and then call CompressionUtils.
> > >
> > > How likely is the above approach to work with subsequent releases?
> > > Is it worth exposing the concept of batches in ByteBufferMessageSet to
> > make
> > > it explicit?
> > >
> > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > where BatchMessage is a simple extension of Message that has an
> > additional
> > > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > > CompressionUtils).
> > >
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Ross
> > >
> > >
> > >
> > > On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Ross,
> > > >
> > > > With compression enabled, it's a bit hard to implement exact-once
> since
> > > > offsets are only advanced after a compressed batch of messages has
> been
> > > > consumed. So, you will have to make sure that each batch of messages
> > can
> > > be
> > > > consumed together as a unit. The other option is to compress with a
> > batch
> > > > size of 1.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > MessageAndOffset)
> > > > > and persist it with my consumer data to get exactly-once semantics
> > for
> > > > > consumer state (as described in the kafka design docs).  If the
> > > consumer
> > > > > fails then it is simply a matter of starting replay of messages
> from
> > > the
> > > > > persisted index.
> > > > >
> > > > > When using compression, the offset from MessageAndOffset appears to
> > be
> > > > the
> > > > > offset of the compressed batch.  e.g. For a batch of 10 messages,
> the
> > > > > offset returned for messages 1-9 is the start of the *current*
> batch,
> > > and
> > > > > the offset for message 10 is the start of the *next* batch.
> > > > >
> > > > > How can I get the exactly-once semantics for consumer state?
> > > > > Is there a way that I can get a batch of messages from
> > SimpleConsumer?
> > > > > (otherwise I have to reconstruct a batch by watching for a change
> in
> > > the
> > > > > offset between messages)
> > > > >
> > > > > Thanks,
> > > > > Ross
> > > > >
> > > >
> > >
> >
>

Re: Exactly-once semantics with compression

Posted by Ross Black <ro...@gmail.com>.
Hi Jun,

The only reason I would like the compressed messages exposed is so that I
know the boundary to be able to safely persist my state with the offset.
Is there a better way to achieve that?
In my (probably poor) example attempt to expose batch messages, the only
things you can do with a compressed message set are - get the offset, get
the serialized form, and iterate over the contained messages.

Is kafka attempting to support exactly-once semantics? If so, it would seem
that something needs to be exposed in the API to make it a bit more
explicit than having to keep track of offsets changing for individual
messages.

Thanks,
Ross



On 2 June 2012 14:19, Jun Rao <ju...@gmail.com> wrote:

> Ross,
>
> The shallow iterator is intended for efficient mirroring btw kafka
> clusters. Not sure if it's a good idea to expose it as an external api.
> Note that you can really can't do much on a compressed message set other
> than store it as raw bytes somewhere else.
>
> Thanks,
>
> Jun
>
> On Thu, May 31, 2012 at 11:32 PM, Ross Black <ro...@gmail.com>
> wrote:
>
> > Hi Jun.
> >
> > I did find a way to process by batch, but it probably reaches a little
> too
> > deep into the internals of kafka?
> >
> >            FetchRequest request = new FetchRequest("topic",
> > partitionNumber, requestOffset, bufferSize);
> >            ByteBufferMessageSet messageSet =
> simpleConsumer.fetch(request);
> >            Iterator<MessageAndOffset> batchIterator =
> > messageSet.underlying().shallowIterator();
> >            while (batchIterator.hasNext()) {
> >                MessageAndOffset messageAndOffset = batchIterator.next();
> >                Message batchMessage = messageAndOffset.message();
> >                long offset = messageAndOffset.offset();
> >                Iterator<MessageAndOffset> messages =
> > CompressionUtils.decompress(batchMessage).iterator();
> >                // process the batch of messages and persist with the
> offset
> >            }
> >
> > This should work ok, but I am concerned that it is using internal kafka
> > classes.  The code has to reach into the underlying (scala)
> > ByteBufferMessageSet because shallowIterator is not exposed by the java
> > variant.  The code also has to understand that the message is potentially
> > compressed and then call CompressionUtils.
> >
> > How likely is the above approach to work with subsequent releases?
> > Is it worth exposing the concept of batches in ByteBufferMessageSet to
> make
> > it explicit?
> >
> > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > where BatchMessage is a simple extension of Message that has an
> additional
> > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > CompressionUtils).
> >
> >
> > Thoughts?
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Ross,
> > >
> > > With compression enabled, it's a bit hard to implement exact-once since
> > > offsets are only advanced after a compressed batch of messages has been
> > > consumed. So, you will have to make sure that each batch of messages
> can
> > be
> > > consumed together as a unit. The other option is to compress with a
> batch
> > > size of 1.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Using SimpleConsumer, I get the offset of a message (from
> > > MessageAndOffset)
> > > > and persist it with my consumer data to get exactly-once semantics
> for
> > > > consumer state (as described in the kafka design docs).  If the
> > consumer
> > > > fails then it is simply a matter of starting replay of messages from
> > the
> > > > persisted index.
> > > >
> > > > When using compression, the offset from MessageAndOffset appears to
> be
> > > the
> > > > offset of the compressed batch.  e.g. For a batch of 10 messages, the
> > > > offset returned for messages 1-9 is the start of the *current* batch,
> > and
> > > > the offset for message 10 is the start of the *next* batch.
> > > >
> > > > How can I get the exactly-once semantics for consumer state?
> > > > Is there a way that I can get a batch of messages from
> SimpleConsumer?
> > > > (otherwise I have to reconstruct a batch by watching for a change in
> > the
> > > > offset between messages)
> > > >
> > > > Thanks,
> > > > Ross
> > > >
> > >
> >
>

Re: Exactly-once semantics with compression

Posted by Jun Rao <ju...@gmail.com>.
Ross,

The shallow iterator is intended for efficient mirroring btw kafka
clusters. Not sure if it's a good idea to expose it as an external api.
Note that you can really can't do much on a compressed message set other
than store it as raw bytes somewhere else.

Thanks,

Jun

On Thu, May 31, 2012 at 11:32 PM, Ross Black <ro...@gmail.com> wrote:

> Hi Jun.
>
> I did find a way to process by batch, but it probably reaches a little too
> deep into the internals of kafka?
>
>            FetchRequest request = new FetchRequest("topic",
> partitionNumber, requestOffset, bufferSize);
>            ByteBufferMessageSet messageSet = simpleConsumer.fetch(request);
>            Iterator<MessageAndOffset> batchIterator =
> messageSet.underlying().shallowIterator();
>            while (batchIterator.hasNext()) {
>                MessageAndOffset messageAndOffset = batchIterator.next();
>                Message batchMessage = messageAndOffset.message();
>                long offset = messageAndOffset.offset();
>                Iterator<MessageAndOffset> messages =
> CompressionUtils.decompress(batchMessage).iterator();
>                // process the batch of messages and persist with the offset
>            }
>
> This should work ok, but I am concerned that it is using internal kafka
> classes.  The code has to reach into the underlying (scala)
> ByteBufferMessageSet because shallowIterator is not exposed by the java
> variant.  The code also has to understand that the message is potentially
> compressed and then call CompressionUtils.
>
> How likely is the above approach to work with subsequent releases?
> Is it worth exposing the concept of batches in ByteBufferMessageSet to make
> it explicit?
>
> eg ByteBufferMessageSet.batchIterator : BatchMessage
> where BatchMessage is a simple extension of Message that has an additional
> method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> CompressionUtils).
>
>
> Thoughts?
>
> Thanks,
> Ross
>
>
>
> On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
>
> > Ross,
> >
> > With compression enabled, it's a bit hard to implement exact-once since
> > offsets are only advanced after a compressed batch of messages has been
> > consumed. So, you will have to make sure that each batch of messages can
> be
> > consumed together as a unit. The other option is to compress with a batch
> > size of 1.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Using SimpleConsumer, I get the offset of a message (from
> > MessageAndOffset)
> > > and persist it with my consumer data to get exactly-once semantics for
> > > consumer state (as described in the kafka design docs).  If the
> consumer
> > > fails then it is simply a matter of starting replay of messages from
> the
> > > persisted index.
> > >
> > > When using compression, the offset from MessageAndOffset appears to be
> > the
> > > offset of the compressed batch.  e.g. For a batch of 10 messages, the
> > > offset returned for messages 1-9 is the start of the *current* batch,
> and
> > > the offset for message 10 is the start of the *next* batch.
> > >
> > > How can I get the exactly-once semantics for consumer state?
> > > Is there a way that I can get a batch of messages from SimpleConsumer?
> > > (otherwise I have to reconstruct a batch by watching for a change in
> the
> > > offset between messages)
> > >
> > > Thanks,
> > > Ross
> > >
> >
>

Re: Exactly-once semantics with compression

Posted by Ross Black <ro...@gmail.com>.
Sorry, I meant my example to be:

eg ByteBufferMessageSet.batchIterator : Iterator<BatchMessage>
where BatchMessage is a simple extension of Message that has an additional
method to allow getting a ByteBufferMessageSet (ie. wraps the call to
CompressionUtils).


On 1 June 2012 16:32, Ross Black <ro...@gmail.com> wrote:

> Hi Jun.
>
> I did find a way to process by batch, but it probably reaches a little too
> deep into the internals of kafka?
>
>             FetchRequest request = new FetchRequest("topic",
> partitionNumber, requestOffset, bufferSize);
>             ByteBufferMessageSet messageSet =
> simpleConsumer.fetch(request);
>             Iterator<MessageAndOffset> batchIterator =
> messageSet.underlying().shallowIterator();
>             while (batchIterator.hasNext()) {
>                 MessageAndOffset messageAndOffset = batchIterator.next();
>                 Message batchMessage = messageAndOffset.message();
>                 long offset = messageAndOffset.offset();
>                 Iterator<MessageAndOffset> messages =
> CompressionUtils.decompress(batchMessage).iterator();
>                 // process the batch of messages and persist with the
> offset
>             }
>
> This should work ok, but I am concerned that it is using internal kafka
> classes.  The code has to reach into the underlying (scala)
> ByteBufferMessageSet because shallowIterator is not exposed by the java
> variant.  The code also has to understand that the message is potentially
> compressed and then call CompressionUtils.
>
> How likely is the above approach to work with subsequent releases?
> Is it worth exposing the concept of batches in ByteBufferMessageSet to
> make it explicit?
>
> eg ByteBufferMessageSet.batchIterator : BatchMessage
> where BatchMessage is a simple extension of Message that has an additional
> method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> CompressionUtils).
>
>
> Thoughts?
>
> Thanks,
> Ross
>
>
>
>
> On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:
>
>> Ross,
>>
>> With compression enabled, it's a bit hard to implement exact-once since
>> offsets are only advanced after a compressed batch of messages has been
>> consumed. So, you will have to make sure that each batch of messages can
>> be
>> consumed together as a unit. The other option is to compress with a batch
>> size of 1.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > Using SimpleConsumer, I get the offset of a message (from
>> MessageAndOffset)
>> > and persist it with my consumer data to get exactly-once semantics for
>> > consumer state (as described in the kafka design docs).  If the consumer
>> > fails then it is simply a matter of starting replay of messages from the
>> > persisted index.
>> >
>> > When using compression, the offset from MessageAndOffset appears to be
>> the
>> > offset of the compressed batch.  e.g. For a batch of 10 messages, the
>> > offset returned for messages 1-9 is the start of the *current* batch,
>> and
>> > the offset for message 10 is the start of the *next* batch.
>> >
>> > How can I get the exactly-once semantics for consumer state?
>> > Is there a way that I can get a batch of messages from SimpleConsumer?
>> > (otherwise I have to reconstruct a batch by watching for a change in the
>> > offset between messages)
>> >
>> > Thanks,
>> > Ross
>> >
>>
>
>

Re: Exactly-once semantics with compression

Posted by Ross Black <ro...@gmail.com>.
Hi Jun.

I did find a way to process by batch, but it probably reaches a little too
deep into the internals of kafka?

            FetchRequest request = new FetchRequest("topic",
partitionNumber, requestOffset, bufferSize);
            ByteBufferMessageSet messageSet = simpleConsumer.fetch(request);
            Iterator<MessageAndOffset> batchIterator =
messageSet.underlying().shallowIterator();
            while (batchIterator.hasNext()) {
                MessageAndOffset messageAndOffset = batchIterator.next();
                Message batchMessage = messageAndOffset.message();
                long offset = messageAndOffset.offset();
                Iterator<MessageAndOffset> messages =
CompressionUtils.decompress(batchMessage).iterator();
                // process the batch of messages and persist with the offset
            }

This should work ok, but I am concerned that it is using internal kafka
classes.  The code has to reach into the underlying (scala)
ByteBufferMessageSet because shallowIterator is not exposed by the java
variant.  The code also has to understand that the message is potentially
compressed and then call CompressionUtils.

How likely is the above approach to work with subsequent releases?
Is it worth exposing the concept of batches in ByteBufferMessageSet to make
it explicit?

eg ByteBufferMessageSet.batchIterator : BatchMessage
where BatchMessage is a simple extension of Message that has an additional
method to allow getting a ByteBufferMessageSet (ie. wraps the call to
CompressionUtils).


Thoughts?

Thanks,
Ross



On 1 June 2012 14:51, Jun Rao <ju...@gmail.com> wrote:

> Ross,
>
> With compression enabled, it's a bit hard to implement exact-once since
> offsets are only advanced after a compressed batch of messages has been
> consumed. So, you will have to make sure that each batch of messages can be
> consumed together as a unit. The other option is to compress with a batch
> size of 1.
>
> Thanks,
>
> Jun
>
> On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Using SimpleConsumer, I get the offset of a message (from
> MessageAndOffset)
> > and persist it with my consumer data to get exactly-once semantics for
> > consumer state (as described in the kafka design docs).  If the consumer
> > fails then it is simply a matter of starting replay of messages from the
> > persisted index.
> >
> > When using compression, the offset from MessageAndOffset appears to be
> the
> > offset of the compressed batch.  e.g. For a batch of 10 messages, the
> > offset returned for messages 1-9 is the start of the *current* batch, and
> > the offset for message 10 is the start of the *next* batch.
> >
> > How can I get the exactly-once semantics for consumer state?
> > Is there a way that I can get a batch of messages from SimpleConsumer?
> > (otherwise I have to reconstruct a batch by watching for a change in the
> > offset between messages)
> >
> > Thanks,
> > Ross
> >
>

Re: Exactly-once semantics with compression

Posted by Jun Rao <ju...@gmail.com>.
Ross,

With compression enabled, it's a bit hard to implement exact-once since
offsets are only advanced after a compressed batch of messages has been
consumed. So, you will have to make sure that each batch of messages can be
consumed together as a unit. The other option is to compress with a batch
size of 1.

Thanks,

Jun

On Thu, May 31, 2012 at 8:05 PM, Ross Black <ro...@gmail.com> wrote:

> Hi,
>
> Using SimpleConsumer, I get the offset of a message (from MessageAndOffset)
> and persist it with my consumer data to get exactly-once semantics for
> consumer state (as described in the kafka design docs).  If the consumer
> fails then it is simply a matter of starting replay of messages from the
> persisted index.
>
> When using compression, the offset from MessageAndOffset appears to be the
> offset of the compressed batch.  e.g. For a batch of 10 messages, the
> offset returned for messages 1-9 is the start of the *current* batch, and
> the offset for message 10 is the start of the *next* batch.
>
> How can I get the exactly-once semantics for consumer state?
> Is there a way that I can get a batch of messages from SimpleConsumer?
> (otherwise I have to reconstruct a batch by watching for a change in the
> offset between messages)
>
> Thanks,
> Ross
>