You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tomasz Gac <to...@empirica.io.INVALID> on 2022/11/15 10:20:24 UTC

Re: Out of order messages when kafka streams application catches up

Hi John,

I've reviewed the test you sent and it seems to be correct, but it may not
reproduce our setup.

We are using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
against the kafka broker version 2.1.1. We are running it as an OSGI bundle
with dependencies packaged within the bundle.

Thank you,
Tomasz


pt., 30 wrz 2022 o 17:57 John Roesler <vv...@apache.org> napisał(a):

> Hi again, Tomasz,
>
> Your issue is really bugging me, since I'm pretty sure it shouldn't be
> happening.
>
> I went ahead and added an integration test with your exact scenario, as I
> understand it: https://github.com/apache/kafka/pull/12706
>
> The test passes for me.
>
> Do you think you can check it out and try adjusting the test setup until
> you're able to reproduce the behavior you're seeing? If you can do that, I
> think we will get to the bottom of it.
>
> Thanks,
> -John
>
> On Fri, Sep 30, 2022, at 09:51, John Roesler wrote:
> > Hi Tomasz,
> >
> > Thanks for trying that out. It’s not the way I’d expect it to work. I
> > don’t remember if there were any follow-up bugs that have been solved
> > in subsequent releases. Just as a long shot, perhaps you can try it on
> > the latest release (3.3.0)?
> >
> > Otherwise, I think the best path forward would be to file a bug report
> > on the Apache Kafka Jira with enough information to reproduce the issue
> > (or if you’re able to provide a repro, that would be awesome).
> >
> > Thanks, and sorry for the trouble.
> > -John
> >
> > On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> >> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and
> it did
> >> not help.
> >> When lag is large, the application still consumes data batches without
> >> interleaving.
> >>
> >>
> >>
> >> wt., 27 wrz 2022 o 05:51 John Roesler <vv...@apache.org> napisał(a):
> >>
> >>> Hi Tomasz,
> >>>
> >>> Thanks for asking. This sounds like the situation that we fixed in
> Apache
> >>> Kafka 3.0, with KIP-695 (
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
> >>> ).
> >>>
> >>> Can you try upgrading and let us know if this fixes the problem?
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> >>> > Hi group,
> >>> >
> >>> > I wrote a simple kafka streams application with topology such as
> below:
> >>> >
> >>> > builder.addStateStore(
> >>> >>     Stores.keyValueStoreBuilder(
> >>> >>     Stores.persistentKeyValueStore("STORE"),
> >>> >>     Serdes.String(), Serdes.String())
> >>> >>         .withLoggingEnabled(storeConfig))|
> >>> >
> >>> >
> >>> >
> >>> > builder.stream("TOPIC_1", Consumed.with(...))
> >>> >>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
> >>> >>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
> >>> >>     .map(...) // stateless
> >>> >>     .transform(..., "STORE")  // stateful
> >>> >
> >>> >     .to("TOPIC_4");
> >>> >
> >>> >
> >>> > All input topics have 6 partitions, and for the purpose of testing,
> we
> >>> are
> >>> > producing data to partition number 5.
> >>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
> >>> >
> >>> > The application works as expected when it has caught up to the lag,
> eg.
> >>> > when reset tool is used with --to-latest parameter.
> >>> > However, when the application is processing the messages starting
> from
> >>> the
> >>> > earliest offset, the inputs are provided in batches such as:
> >>> >
> >>> >    - ~1000 messages from TOPIC_1
> >>> >    - ~1000 messages from TOPIC_2
> >>> >    - ~1000 messages from TOPIC_3
> >>> >
> >>> > All of the messages have timestamps provided in headers, so I would
> >>> expect
> >>> > the application to interleave the messages from these three topics so
> >>> that
> >>> > their timestamps are in the ascending order.
> >>> > However, this is not the case that I am observing. The messages are
> >>> > processed in batches.
> >>> >
> >>> > How do I configure my application so that it processes messages in
> order
> >>> > when it is catching up to the lag?
> >>>
>

Re: Out of order messages when kafka streams application catches up

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Tomasz you'll need to upgrade the kafka Streams dependency to 3.0 (or above)
to get the fix that John mentioned before -- this behavior is
known/expected on
earlier versions such as 2.8.1 as you are using

On Tue, Nov 15, 2022 at 2:21 AM Tomasz Gac <to...@empirica.io.invalid>
wrote:

> Hi John,
>
> I've reviewed the test you sent and it seems to be correct, but it may not
> reproduce our setup.
>
> We are using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
> against the kafka broker version 2.1.1. We are running it as an OSGI bundle
> with dependencies packaged within the bundle.
>
> Thank you,
> Tomasz
>
>
> pt., 30 wrz 2022 o 17:57 John Roesler <vv...@apache.org> napisał(a):
>
> > Hi again, Tomasz,
> >
> > Your issue is really bugging me, since I'm pretty sure it shouldn't be
> > happening.
> >
> > I went ahead and added an integration test with your exact scenario, as I
> > understand it: https://github.com/apache/kafka/pull/12706
> >
> > The test passes for me.
> >
> > Do you think you can check it out and try adjusting the test setup until
> > you're able to reproduce the behavior you're seeing? If you can do that,
> I
> > think we will get to the bottom of it.
> >
> > Thanks,
> > -John
> >
> > On Fri, Sep 30, 2022, at 09:51, John Roesler wrote:
> > > Hi Tomasz,
> > >
> > > Thanks for trying that out. It’s not the way I’d expect it to work. I
> > > don’t remember if there were any follow-up bugs that have been solved
> > > in subsequent releases. Just as a long shot, perhaps you can try it on
> > > the latest release (3.3.0)?
> > >
> > > Otherwise, I think the best path forward would be to file a bug report
> > > on the Apache Kafka Jira with enough information to reproduce the issue
> > > (or if you’re able to provide a repro, that would be awesome).
> > >
> > > Thanks, and sorry for the trouble.
> > > -John
> > >
> > > On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> > >> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and
> > it did
> > >> not help.
> > >> When lag is large, the application still consumes data batches without
> > >> interleaving.
> > >>
> > >>
> > >>
> > >> wt., 27 wrz 2022 o 05:51 John Roesler <vv...@apache.org>
> napisał(a):
> > >>
> > >>> Hi Tomasz,
> > >>>
> > >>> Thanks for asking. This sounds like the situation that we fixed in
> > Apache
> > >>> Kafka 3.0, with KIP-695 (
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
> > >>> ).
> > >>>
> > >>> Can you try upgrading and let us know if this fixes the problem?
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> > >>> > Hi group,
> > >>> >
> > >>> > I wrote a simple kafka streams application with topology such as
> > below:
> > >>> >
> > >>> > builder.addStateStore(
> > >>> >>     Stores.keyValueStoreBuilder(
> > >>> >>     Stores.persistentKeyValueStore("STORE"),
> > >>> >>     Serdes.String(), Serdes.String())
> > >>> >>         .withLoggingEnabled(storeConfig))|
> > >>> >
> > >>> >
> > >>> >
> > >>> > builder.stream("TOPIC_1", Consumed.with(...))
> > >>> >>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
> > >>> >>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
> > >>> >>     .map(...) // stateless
> > >>> >>     .transform(..., "STORE")  // stateful
> > >>> >
> > >>> >     .to("TOPIC_4");
> > >>> >
> > >>> >
> > >>> > All input topics have 6 partitions, and for the purpose of testing,
> > we
> > >>> are
> > >>> > producing data to partition number 5.
> > >>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
> > >>> >
> > >>> > The application works as expected when it has caught up to the lag,
> > eg.
> > >>> > when reset tool is used with --to-latest parameter.
> > >>> > However, when the application is processing the messages starting
> > from
> > >>> the
> > >>> > earliest offset, the inputs are provided in batches such as:
> > >>> >
> > >>> >    - ~1000 messages from TOPIC_1
> > >>> >    - ~1000 messages from TOPIC_2
> > >>> >    - ~1000 messages from TOPIC_3
> > >>> >
> > >>> > All of the messages have timestamps provided in headers, so I would
> > >>> expect
> > >>> > the application to interleave the messages from these three topics
> so
> > >>> that
> > >>> > their timestamps are in the ascending order.
> > >>> > However, this is not the case that I am observing. The messages are
> > >>> > processed in batches.
> > >>> >
> > >>> > How do I configure my application so that it processes messages in
> > order
> > >>> > when it is catching up to the lag?
> > >>>
> >
>