You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Avi Flax <av...@aviflax.com> on 2016/02/18 22:00:20 UTC

Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Hello all, I have a question about Kafka Streams, which I’m evaluating
for a new project. (I know it’s still a work in progress but it might
work anyway for this project.)

I’m new to Kafka in particular and distributed systems in general so
please forgive me I’m confused about any of these concepts.

>From reading the docs on the new consumer API, I have the impression
that letting the consumer auto-commit is roughly akin to at-most-once
delivery, because a commit could occur past a record that wasn’t
actually processed. So in order to achieve at-least-once delivery, one
needs to employ “manual offset control” and explicitly commit after
processing has succeeded.

If I’ve got that right, then that leads me to my question about KStreams.

>From looking at the word count examples, it seems pretty clear that
using the “lower level” approach demonstrated in WordCountProcessorJob
— wherein a TopologyBuilder is created and supplied with a
ProcessorSupplier that supplies a Processor that receives a
ProcessorContext and calls commit on that ProcessorContext once
processing succeeds — enables the at-least-once delivery model. OK,
cool.

Looking at WordCountJob, however, which uses KStreams, I don’t see any
committing happening there explicitly, and in fact I searched the
entire kstream source tree (for the kstream package and its internals
sub-package) and I don’t see any calls to commit there. So my
_impression_ is that maybe any KStream topology can use only
auto-commit, and therefore only at-most-once processing.

Basically I’m wondering if my impression is correct, or not, or if I’m
just totally misunderstanding the code in its current state.

Thanks!
Avi

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Jay Kreps <ja...@confluent.io>.
Yeah I didn't mean to imply that we committed after each poll, but rather
that when it was time to commit, this would happen on the next poll call
and hence only commit processed messages.

-Jay

On Thu, Feb 18, 2016 at 2:21 PM, Avi Flax <av...@aviflax.com> wrote:

> On Thu, Feb 18, 2016 at 4:26 PM, Jay Kreps <ja...@confluent.io> wrote:
> > The default semantics of the new consumer with auto commit are
> > at-least-once-delivery. Basically during the poll() call the commit will
> be
> > triggered and will commit the offset for the messages consumed during the
> > previous poll call. This is an advantage over the older scala consumer
> > where the consumer did not have this guarantee because the commit
> happened
> > asynchronously in a separate thread and hence could end up preceeding or
> > succeeding the actual processing of data.
>
> Sorry, I just realized I have one follow-up question on this… if this
> is the case, then why does the new consumer have the config property
> `auto.commit.interval.ms`? The approach of committing automatically
> after an interval seems to me to be at odds with the approach of
> committing after each call to poll(). What am I missing here?
>
> Thanks!
> Avi
>

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Avi Flax <av...@aviflax.com>.
On Thu, Feb 18, 2016 at 8:03 PM, Jason Gustafson <ja...@confluent.io> wrote:
> The consumer is single-threaded, so we only trigger commits in the call to
> poll(). As long as you consume all the records returned from each poll
> call, the committed offset will never get ahead of the consumed offset, and
> you'll have at-lest-once delivery. Note that the implication is that "
> auto.commit.interval.ms" is not strictly followed unless each iteration of
> the loop takes less time than the configured commit interval.

Ah, that makes sense. Thank you!

On Thu, Feb 18, 2016 at 8:30 PM, Jay Kreps <ja...@confluent.io> wrote:
> Yeah I didn't mean to imply that we committed after each poll, but rather
> that when it was time to commit, this would happen on the next poll call
> and hence only commit processed messages.

Ah, cool, got it. Thank you!

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Jason Gustafson <ja...@confluent.io>.
The consumer is single-threaded, so we only trigger commits in the call to
poll(). As long as you consume all the records returned from each poll
call, the committed offset will never get ahead of the consumed offset, and
you'll have at-lest-once delivery. Note that the implication is that "
auto.commit.interval.ms" is not strictly followed unless each iteration of
the loop takes less time than the configured commit interval.

-Jason

On Thu, Feb 18, 2016 at 2:21 PM, Avi Flax <av...@aviflax.com> wrote:

> On Thu, Feb 18, 2016 at 4:26 PM, Jay Kreps <ja...@confluent.io> wrote:
> > The default semantics of the new consumer with auto commit are
> > at-least-once-delivery. Basically during the poll() call the commit will
> be
> > triggered and will commit the offset for the messages consumed during the
> > previous poll call. This is an advantage over the older scala consumer
> > where the consumer did not have this guarantee because the commit
> happened
> > asynchronously in a separate thread and hence could end up preceeding or
> > succeeding the actual processing of data.
>
> Sorry, I just realized I have one follow-up question on this… if this
> is the case, then why does the new consumer have the config property
> `auto.commit.interval.ms`? The approach of committing automatically
> after an interval seems to me to be at odds with the approach of
> committing after each call to poll(). What am I missing here?
>
> Thanks!
> Avi
>

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Avi Flax <av...@aviflax.com>.
On Thu, Feb 18, 2016 at 4:26 PM, Jay Kreps <ja...@confluent.io> wrote:
> The default semantics of the new consumer with auto commit are
> at-least-once-delivery. Basically during the poll() call the commit will be
> triggered and will commit the offset for the messages consumed during the
> previous poll call. This is an advantage over the older scala consumer
> where the consumer did not have this guarantee because the commit happened
> asynchronously in a separate thread and hence could end up preceeding or
> succeeding the actual processing of data.

Sorry, I just realized I have one follow-up question on this… if this
is the case, then why does the new consumer have the config property
`auto.commit.interval.ms`? The approach of committing automatically
after an interval seems to me to be at odds with the approach of
committing after each call to poll(). What am I missing here?

Thanks!
Avi

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Avi Flax <av...@aviflax.com>.
On Thu, Feb 18, 2016 at 4:26 PM, Jay Kreps <ja...@confluent.io> wrote:
> The default semantics of the new consumer with auto commit are
> at-least-once-delivery. Basically during the poll() call the commit will be
> triggered and will commit the offset for the messages consumed during the
> previous poll call. This is an advantage over the older scala consumer
> where the consumer did not have this guarantee because the commit happened
> asynchronously in a separate thread and hence could end up preceeding or
> succeeding the actual processing of data.
>
> For streams this is exactly the same, and the guarantee is also
> at-least-once-delivery (we will work on strengthening this in the future).

Thank you for the rapid, clear, thorough, and satisfying answer!

Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

Posted by Jay Kreps <ja...@confluent.io>.
The default semantics of the new consumer with auto commit are
at-least-once-delivery. Basically during the poll() call the commit will be
triggered and will commit the offset for the messages consumed during the
previous poll call. This is an advantage over the older scala consumer
where the consumer did not have this guarantee because the commit happened
asynchronously in a separate thread and hence could end up preceeding or
succeeding the actual processing of data.

For streams this is exactly the same, and the guarantee is also
at-least-once-delivery (we will work on strengthening this in the future).

-Jay

On Thu, Feb 18, 2016 at 1:00 PM, Avi Flax <av...@aviflax.com> wrote:

> Hello all, I have a question about Kafka Streams, which I’m evaluating
> for a new project. (I know it’s still a work in progress but it might
> work anyway for this project.)
>
> I’m new to Kafka in particular and distributed systems in general so
> please forgive me I’m confused about any of these concepts.
>
> From reading the docs on the new consumer API, I have the impression
> that letting the consumer auto-commit is roughly akin to at-most-once
> delivery, because a commit could occur past a record that wasn’t
> actually processed. So in order to achieve at-least-once delivery, one
> needs to employ “manual offset control” and explicitly commit after
> processing has succeeded.
>
> If I’ve got that right, then that leads me to my question about KStreams.
>
> From looking at the word count examples, it seems pretty clear that
> using the “lower level” approach demonstrated in WordCountProcessorJob
> — wherein a TopologyBuilder is created and supplied with a
> ProcessorSupplier that supplies a Processor that receives a
> ProcessorContext and calls commit on that ProcessorContext once
> processing succeeds — enables the at-least-once delivery model. OK,
> cool.
>
> Looking at WordCountJob, however, which uses KStreams, I don’t see any
> committing happening there explicitly, and in fact I searched the
> entire kstream source tree (for the kstream package and its internals
> sub-package) and I don’t see any calls to commit there. So my
> _impression_ is that maybe any KStream topology can use only
> auto-commit, and therefore only at-most-once processing.
>
> Basically I’m wondering if my impression is correct, or not, or if I’m
> just totally misunderstanding the code in its current state.
>
> Thanks!
> Avi
>