You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Richard Rossel <he...@gmail.com> on 2020/07/08 17:54:37 UTC

Detected out-of-order KTable update warnings

Hi there,

I'm getting lot of this type of warning:


 WARN org.apache.kafka.streams.kstream.internals.KTableSource - Detected
out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
65806, partition 5.

It looks like the warning is generated each time a new record goes into the
source topic which is being used to load a globalKTable, and only happens
when the source topic is updated very frequently (each second).

I would like to know what is the meaning of that warning, maybe I'm not
loading the Ktable correctly.

My process is basically a simple enrichment process (Using Kafka 2.4):
...

 GlobalKTable<String, String> entity = builder.globalTable(entityTopic);

 KStream<String, String> joined = dataStream.join(entity,

                (leftKey, leftValue) -> leftKey,

                joiner);

 joined.to(enrichedTopic);
...

The entityTopic and the topic used by dataStream, are sharing the same key
type (username),
and the entityTopic is receiving entries from another process very
frequently.

Do you think those warning messages are because of the way I'm creating the
Ktable?

Thanks.-
-- 
Richard Rossel
Atlanta - GA

Re: Detected out-of-order KTable update warnings

Posted by Richard Rossel <he...@gmail.com>.
Configuring the topic to let the brokers set up the timestamps should fix
my problem.
Thanks for the explanation and  for sharing your talk.

On Fri, Jul 10, 2020 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote:

> By default, timestamps are set by the producer.
>
> Thus, if different producers write to the same topic-partition, their
> writes are interleaved and even if each individual client sends data
> ordered, they might end up out-of-order in the topic.
>
> As you are only interested in per-key ordering, it might be sufficient
> to ensure that data for a specific key is only written by a single
> producer? -- You might still have unorder across different keys, but a
> per-key order should actually be sufficient.
>
> Or, if you don't care about the race condition between two producers
> updating the same key, you could also reconfigure the topic and let the
> broker set the timestamp.
>
> Another alternative is, to use a different timestamp extractor and
> change the timestamps on read.
>
> Shameless plug: I did a talk about this topic that might be helpful ->
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>
>
> -Matthias
>
> On 7/10/20 7:00 AM, Richard Rossel wrote:
> > Thanks Matthias, it makes sense, now I need to find out why the
> > topic is not sorted by timestamp.
> >
> > The topic I'm using to be loaded as globalKTable is partitioned by key,
> > and the timestamp is being handled by kafka.
> > I have multiple clients, different machines, pushing data to that topic,
> >  maybe the problem is a time synchronization issue between machines.
> >
> > Do you know if the timestamp associated with each record on topic is
> > being set up by clients pushing the data,  or by the kafka broker leader
> of
> > that partition?
> >
> >
> > Thanks.-
> >
> >
> >
> > On Thu, Jul 9, 2020 at 4:45 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> If you load data into a KTable or GlobalKTable, it's expected that the
> >> data is partitioned by key, and that records with the same key have
> >> non-descending timestamps.
> >>
> >> If a record with let's say key A and timestamp 5 is put into the table,
> >> and later a record with key A and timestamp 4 is put into the table, the
> >> second record is an out-of-order record and a warning is logged for this
> >> case, as it might indicate that your data is no ordered correctly what
> >> might lead to incorrect/unexpected join results.
> >>
> >> If data is out-of-order your table content semantically goes (partially)
> >> back in time what is usually undesired -- your table content should
> >> usually go forward in time only.
> >>
> >> Does this help?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 7/8/20 10:54 AM, Richard Rossel wrote:
> >>> Hi there,
> >>>
> >>> I'm getting lot of this type of warning:
> >>>
> >>>
> >>>  WARN org.apache.kafka.streams.kstream.internals.KTableSource -
> Detected
> >>> out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
> >>> 65806, partition 5.
> >>>
> >>> It looks like the warning is generated each time a new record goes into
> >> the
> >>> source topic which is being used to load a globalKTable, and only
> happens
> >>> when the source topic is updated very frequently (each second).
> >>>
> >>> I would like to know what is the meaning of that warning, maybe I'm not
> >>> loading the Ktable correctly.
> >>>
> >>> My process is basically a simple enrichment process (Using Kafka 2.4):
> >>> ...
> >>>
> >>>  GlobalKTable<String, String> entity =
> builder.globalTable(entityTopic);
> >>>
> >>>  KStream<String, String> joined = dataStream.join(entity,
> >>>
> >>>                 (leftKey, leftValue) -> leftKey,
> >>>
> >>>                 joiner);
> >>>
> >>>  joined.to(enrichedTopic);
> >>> ...
> >>>
> >>> The entityTopic and the topic used by dataStream, are sharing the same
> >> key
> >>> type (username),
> >>> and the entityTopic is receiving entries from another process very
> >>> frequently.
> >>>
> >>> Do you think those warning messages are because of the way I'm creating
> >> the
> >>> Ktable?
> >>>
> >>> Thanks.-
> >>>
> >>
> >>
> >
>
>

-- 
Richard Rossel
Atlanta - GA

Re: Detected out-of-order KTable update warnings

Posted by "Matthias J. Sax" <mj...@apache.org>.
By default, timestamps are set by the producer.

Thus, if different producers write to the same topic-partition, their
writes are interleaved and even if each individual client sends data
ordered, they might end up out-of-order in the topic.

As you are only interested in per-key ordering, it might be sufficient
to ensure that data for a specific key is only written by a single
producer? -- You might still have unorder across different keys, but a
per-key order should actually be sufficient.

Or, if you don't care about the race condition between two producers
updating the same key, you could also reconfigure the topic and let the
broker set the timestamp.

Another alternative is, to use a different timestamp extractor and
change the timestamps on read.

Shameless plug: I did a talk about this topic that might be helpful ->
https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/


-Matthias

On 7/10/20 7:00 AM, Richard Rossel wrote:
> Thanks Matthias, it makes sense, now I need to find out why the
> topic is not sorted by timestamp.
> 
> The topic I'm using to be loaded as globalKTable is partitioned by key,
> and the timestamp is being handled by kafka.
> I have multiple clients, different machines, pushing data to that topic,
>  maybe the problem is a time synchronization issue between machines.
> 
> Do you know if the timestamp associated with each record on topic is
> being set up by clients pushing the data,  or by the kafka broker leader of
> that partition?
> 
> 
> Thanks.-
> 
> 
> 
> On Thu, Jul 9, 2020 at 4:45 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> If you load data into a KTable or GlobalKTable, it's expected that the
>> data is partitioned by key, and that records with the same key have
>> non-descending timestamps.
>>
>> If a record with let's say key A and timestamp 5 is put into the table,
>> and later a record with key A and timestamp 4 is put into the table, the
>> second record is an out-of-order record and a warning is logged for this
>> case, as it might indicate that your data is no ordered correctly what
>> might lead to incorrect/unexpected join results.
>>
>> If data is out-of-order your table content semantically goes (partially)
>> back in time what is usually undesired -- your table content should
>> usually go forward in time only.
>>
>> Does this help?
>>
>>
>> -Matthias
>>
>>
>>
>> On 7/8/20 10:54 AM, Richard Rossel wrote:
>>> Hi there,
>>>
>>> I'm getting lot of this type of warning:
>>>
>>>
>>>  WARN org.apache.kafka.streams.kstream.internals.KTableSource - Detected
>>> out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
>>> 65806, partition 5.
>>>
>>> It looks like the warning is generated each time a new record goes into
>> the
>>> source topic which is being used to load a globalKTable, and only happens
>>> when the source topic is updated very frequently (each second).
>>>
>>> I would like to know what is the meaning of that warning, maybe I'm not
>>> loading the Ktable correctly.
>>>
>>> My process is basically a simple enrichment process (Using Kafka 2.4):
>>> ...
>>>
>>>  GlobalKTable<String, String> entity = builder.globalTable(entityTopic);
>>>
>>>  KStream<String, String> joined = dataStream.join(entity,
>>>
>>>                 (leftKey, leftValue) -> leftKey,
>>>
>>>                 joiner);
>>>
>>>  joined.to(enrichedTopic);
>>> ...
>>>
>>> The entityTopic and the topic used by dataStream, are sharing the same
>> key
>>> type (username),
>>> and the entityTopic is receiving entries from another process very
>>> frequently.
>>>
>>> Do you think those warning messages are because of the way I'm creating
>> the
>>> Ktable?
>>>
>>> Thanks.-
>>>
>>
>>
> 


Re: Detected out-of-order KTable update warnings

Posted by Richard Rossel <he...@gmail.com>.
Thanks Matthias, it makes sense, now I need to find out why the
topic is not sorted by timestamp.

The topic I'm using to be loaded as globalKTable is partitioned by key,
and the timestamp is being handled by kafka.
I have multiple clients, different machines, pushing data to that topic,
 maybe the problem is a time synchronization issue between machines.

Do you know if the timestamp associated with each record on topic is
being set up by clients pushing the data,  or by the kafka broker leader of
that partition?


Thanks.-



On Thu, Jul 9, 2020 at 4:45 PM Matthias J. Sax <mj...@apache.org> wrote:

> If you load data into a KTable or GlobalKTable, it's expected that the
> data is partitioned by key, and that records with the same key have
> non-descending timestamps.
>
> If a record with let's say key A and timestamp 5 is put into the table,
> and later a record with key A and timestamp 4 is put into the table, the
> second record is an out-of-order record and a warning is logged for this
> case, as it might indicate that your data is no ordered correctly what
> might lead to incorrect/unexpected join results.
>
> If data is out-of-order your table content semantically goes (partially)
> back in time what is usually undesired -- your table content should
> usually go forward in time only.
>
> Does this help?
>
>
> -Matthias
>
>
>
> On 7/8/20 10:54 AM, Richard Rossel wrote:
> > Hi there,
> >
> > I'm getting lot of this type of warning:
> >
> >
> >  WARN org.apache.kafka.streams.kstream.internals.KTableSource - Detected
> > out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
> > 65806, partition 5.
> >
> > It looks like the warning is generated each time a new record goes into
> the
> > source topic which is being used to load a globalKTable, and only happens
> > when the source topic is updated very frequently (each second).
> >
> > I would like to know what is the meaning of that warning, maybe I'm not
> > loading the Ktable correctly.
> >
> > My process is basically a simple enrichment process (Using Kafka 2.4):
> > ...
> >
> >  GlobalKTable<String, String> entity = builder.globalTable(entityTopic);
> >
> >  KStream<String, String> joined = dataStream.join(entity,
> >
> >                 (leftKey, leftValue) -> leftKey,
> >
> >                 joiner);
> >
> >  joined.to(enrichedTopic);
> > ...
> >
> > The entityTopic and the topic used by dataStream, are sharing the same
> key
> > type (username),
> > and the entityTopic is receiving entries from another process very
> > frequently.
> >
> > Do you think those warning messages are because of the way I'm creating
> the
> > Ktable?
> >
> > Thanks.-
> >
>
>

-- 
Richard Rossel
Atlanta - GA

Re: Detected out-of-order KTable update warnings

Posted by "Matthias J. Sax" <mj...@apache.org>.
If you load data into a KTable or GlobalKTable, it's expected that the
data is partitioned by key, and that records with the same key have
non-descending timestamps.

If a record with let's say key A and timestamp 5 is put into the table,
and later a record with key A and timestamp 4 is put into the table, the
second record is an out-of-order record and a warning is logged for this
case, as it might indicate that your data is no ordered correctly what
might lead to incorrect/unexpected join results.

If data is out-of-order your table content semantically goes (partially)
back in time what is usually undesired -- your table content should
usually go forward in time only.

Does this help?


-Matthias



On 7/8/20 10:54 AM, Richard Rossel wrote:
> Hi there,
> 
> I'm getting lot of this type of warning:
> 
> 
>  WARN org.apache.kafka.streams.kstream.internals.KTableSource - Detected
> out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
> 65806, partition 5.
> 
> It looks like the warning is generated each time a new record goes into the
> source topic which is being used to load a globalKTable, and only happens
> when the source topic is updated very frequently (each second).
> 
> I would like to know what is the meaning of that warning, maybe I'm not
> loading the Ktable correctly.
> 
> My process is basically a simple enrichment process (Using Kafka 2.4):
> ...
> 
>  GlobalKTable<String, String> entity = builder.globalTable(entityTopic);
> 
>  KStream<String, String> joined = dataStream.join(entity,
> 
>                 (leftKey, leftValue) -> leftKey,
> 
>                 joiner);
> 
>  joined.to(enrichedTopic);
> ...
> 
> The entityTopic and the topic used by dataStream, are sharing the same key
> type (username),
> and the entityTopic is receiving entries from another process very
> frequently.
> 
> Do you think those warning messages are because of the way I'm creating the
> Ktable?
> 
> Thanks.-
>