You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2019/12/02 20:02:01 UTC

KafkaStreams internal producer order guarantee

Hi everyone
In light of the discussions about order guarantee in Kafka, I am struggling
to understand how that affects KafkaStreams internal *KafkaProducer*.
In the official documentation, this section (
https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling)
enumerates
2 causes "that could potentially result in out-of-order data *arrivals*
with respect to their timestamps".
But I haven't found anything that mentioned how KafkaStreams *producers*
will handle errors, and how that could lead to out-of-order messages being
produced in output topics.
When I start my KafkaStreams application, I've seen the internal producers
use the below in its default configuration:
        enable.idempotence = false
        max.in.flight.requests.per.connection = 5
        retries = 2147483647

So I guess that this could mean that at the end of my topology,
KafkaStreams could potentially send out of order messages to an output
topic if for some reason the message fails to be delivered to the broker,
as the internal producer would retry that.

I've read that to guarantee order in the producers, one needs to set
"max.in.flight.requests.per.connection=1". But I wonder if one should
override this configuration for KafkaStreams applications?

Thanks
Murilo

Re: KafkaStreams internal producer order guarantee

Posted by John Roesler <vv...@apache.org>.
Hi Murilo,

For this case, you don’t have to worry. Kafka Streams provides the guarantee you want by default. 

Let us know if you want/need more information!

Cheers,
John

On Tue, Dec 3, 2019, at 08:59, Murilo Tavares wrote:
> Hi Mathias
> Thank you for your feedback.
> I'm still a bit confused about what approach one should take. My
> KafkaStreams application is pretty standard for KafkaStreams: it takes a
> few Table-like topics, group and aggregates some of them so we can join
> with others. Something like this:
> 
> KTable left = builder.table()
> KTable right = builder.table()
> var grouped = right.groupBy(//new key/value).aggregate(...)
> left.leftJoin(grouped, //myFuncion).toStream(...)
> 
> Input and output topics are all Table-like topics, so I understand I need
> "at least once" guarantee, but also need order guarantee at least for the
> same Key. I mean, if you send 2 updates to the same key, I need a guarantee
> I'll have the latest value for that key in the output topic. Is there a
> recommended configuration for this?
> Thanks again
> Murilo
> 
> On Tue, 3 Dec 2019 at 04:29, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> > That is correct. It depends on what guarantees you need though. Also
> > note, that producers ofter write into repartitions topics to re-key data
> > and for this case, no ordering guarantee can be provided anyway, as the
> > single writer principle is "violated".
> >
> > Also note, that Kafka Streams can handle out-of-order data for most
> > cases correctly and thus it should be ok to leave the default config
> > values.
> >
> > But as always: it depends on your application and your requirements. As
> > a rule of thumb: as long as you don't experience any issue, I would just
> > go with default configs.
> >
> >
> > -Matthias
> >
> >
> > On 12/2/19 12:02 PM, Murilo Tavares wrote:
> > > Hi everyone
> > > In light of the discussions about order guarantee in Kafka, I am
> > struggling
> > > to understand how that affects KafkaStreams internal *KafkaProducer*.
> > > In the official documentation, this section (
> > >
> > https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling
> > )
> > > enumerates
> > > 2 causes "that could potentially result in out-of-order data *arrivals*
> > > with respect to their timestamps".
> > > But I haven't found anything that mentioned how KafkaStreams *producers*
> > > will handle errors, and how that could lead to out-of-order messages
> > being
> > > produced in output topics.
> > > When I start my KafkaStreams application, I've seen the internal
> > producers
> > > use the below in its default configuration:
> > >         enable.idempotence = false
> > >         max.in.flight.requests.per.connection = 5
> > >         retries = 2147483647
> > >
> > > So I guess that this could mean that at the end of my topology,
> > > KafkaStreams could potentially send out of order messages to an output
> > > topic if for some reason the message fails to be delivered to the broker,
> > > as the internal producer would retry that.
> > >
> > > I've read that to guarantee order in the producers, one needs to set
> > > "max.in.flight.requests.per.connection=1". But I wonder if one should
> > > override this configuration for KafkaStreams applications?
> > >
> > > Thanks
> > > Murilo
> > >
> >
> >
>

Re: KafkaStreams internal producer order guarantee

Posted by Murilo Tavares <mu...@gmail.com>.
Hi Mathias
Thank you for your feedback.
I'm still a bit confused about what approach one should take. My
KafkaStreams application is pretty standard for KafkaStreams: it takes a
few Table-like topics, group and aggregates some of them so we can join
with others. Something like this:

KTable left = builder.table()
KTable right = builder.table()
var grouped = right.groupBy(//new key/value).aggregate(...)
left.leftJoin(grouped, //myFuncion).toStream(...)

Input and output topics are all Table-like topics, so I understand I need
"at least once" guarantee, but also need order guarantee at least for the
same Key. I mean, if you send 2 updates to the same key, I need a guarantee
I'll have the latest value for that key in the output topic. Is there a
recommended configuration for this?
Thanks again
Murilo

On Tue, 3 Dec 2019 at 04:29, Matthias J. Sax <ma...@confluent.io> wrote:

> That is correct. It depends on what guarantees you need though. Also
> note, that producers ofter write into repartitions topics to re-key data
> and for this case, no ordering guarantee can be provided anyway, as the
> single writer principle is "violated".
>
> Also note, that Kafka Streams can handle out-of-order data for most
> cases correctly and thus it should be ok to leave the default config
> values.
>
> But as always: it depends on your application and your requirements. As
> a rule of thumb: as long as you don't experience any issue, I would just
> go with default configs.
>
>
> -Matthias
>
>
> On 12/2/19 12:02 PM, Murilo Tavares wrote:
> > Hi everyone
> > In light of the discussions about order guarantee in Kafka, I am
> struggling
> > to understand how that affects KafkaStreams internal *KafkaProducer*.
> > In the official documentation, this section (
> >
> https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling
> )
> > enumerates
> > 2 causes "that could potentially result in out-of-order data *arrivals*
> > with respect to their timestamps".
> > But I haven't found anything that mentioned how KafkaStreams *producers*
> > will handle errors, and how that could lead to out-of-order messages
> being
> > produced in output topics.
> > When I start my KafkaStreams application, I've seen the internal
> producers
> > use the below in its default configuration:
> >         enable.idempotence = false
> >         max.in.flight.requests.per.connection = 5
> >         retries = 2147483647
> >
> > So I guess that this could mean that at the end of my topology,
> > KafkaStreams could potentially send out of order messages to an output
> > topic if for some reason the message fails to be delivered to the broker,
> > as the internal producer would retry that.
> >
> > I've read that to guarantee order in the producers, one needs to set
> > "max.in.flight.requests.per.connection=1". But I wonder if one should
> > override this configuration for KafkaStreams applications?
> >
> > Thanks
> > Murilo
> >
>
>

Re: KafkaStreams internal producer order guarantee

Posted by "Matthias J. Sax" <ma...@confluent.io>.
That is correct. It depends on what guarantees you need though. Also
note, that producers ofter write into repartitions topics to re-key data
and for this case, no ordering guarantee can be provided anyway, as the
single writer principle is "violated".

Also note, that Kafka Streams can handle out-of-order data for most
cases correctly and thus it should be ok to leave the default config values.

But as always: it depends on your application and your requirements. As
a rule of thumb: as long as you don't experience any issue, I would just
go with default configs.


-Matthias


On 12/2/19 12:02 PM, Murilo Tavares wrote:
> Hi everyone
> In light of the discussions about order guarantee in Kafka, I am struggling
> to understand how that affects KafkaStreams internal *KafkaProducer*.
> In the official documentation, this section (
> https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling)
> enumerates
> 2 causes "that could potentially result in out-of-order data *arrivals*
> with respect to their timestamps".
> But I haven't found anything that mentioned how KafkaStreams *producers*
> will handle errors, and how that could lead to out-of-order messages being
> produced in output topics.
> When I start my KafkaStreams application, I've seen the internal producers
> use the below in its default configuration:
>         enable.idempotence = false
>         max.in.flight.requests.per.connection = 5
>         retries = 2147483647
> 
> So I guess that this could mean that at the end of my topology,
> KafkaStreams could potentially send out of order messages to an output
> topic if for some reason the message fails to be delivered to the broker,
> as the internal producer would retry that.
> 
> I've read that to guarantee order in the producers, one needs to set
> "max.in.flight.requests.per.connection=1". But I wonder if one should
> override this configuration for KafkaStreams applications?
> 
> Thanks
> Murilo
>