You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dumitru-Nicolae Marasoui <Ni...@kaluza.com> on 2020/07/14 12:08:10 UTC

is ktable-ktable join stil having the risk of non determinism?

In
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin.1
it
is mentioned that "Pay attention, that the KTable lookup is done on the
*current* KTable state, and thus, out-of-order records can yield
non-deterministic result. Furthermore, in practice Kafka Streams does not
guarantee that all records will be processed in timestamp order (even if
processing records in timestamp order is the goal, it is only best effort)."

Is this still a valid concern?
Can you give a few examples on how this may happen and how the end result
would look like? I guess non determinism in this case means that the end
result (the eventual result) can be one of many possible combinations?

Which version(s) of kafka streams have this concern? All of them, right?
(is there any difference between open source & confluent versions?)

Thank you

-- 

Dumitru-Nicolae Marasoui

Software Engineer



w kaluza.com <https://www.kaluza.com/>

LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
<https://twitter.com/Kaluza_tech>

Kaluza Ltd. registered in England and Wales No. 08785057

VAT No. 100119879

Help save paper - do you need to print this email?

Re: is ktable-ktable join stil having the risk of non determinism?

Posted by "Matthias J. Sax" <mj...@apache.org>.
The answer is many-fold:

(1) There is only a final result if the input topics don't get new data.
For this case, the result is deterministic. You can consider it
eventually consistent.

The non-determinism really only applies to "intermediate results".

Both input-topic partitions for a join are never processed concurrently,
but a single thread computes the join, alternating to process records
from both inputs. Thus, no data can be missed via a race condition and
the result will always be complete.

For more details, there will be a talk at Kafka Summit about how time is
handled in Kafka Streams:  "The Flux Capacitor of Kafka Streams and
ksqlDB" (https://events.kafka-summit.org/2020-schedule)


(2) In newer releases (2.1 and newer; cf
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization)
the timestamp synchronization was improved and thus the probability of
non-determinism is reduced -- you can also set config parameter
`max.task.idle.ms` (default is 0) to improve the processing order
guarantees.

There are still some corner cases that don't work perfectly, and we plan
to address them soon (hopefully in 2.7 release).


(3) Out-of-order records may still be problematic. However, usually only
if records with the same key are not ordered within the same partition.
If records with different key are out-of-order in the same partition,
the issue is less severe. However, given the timestamp synchronization,
out-of-order record may still lead to problems synchronizing the
processing between both inputs. It's on our roadmap to improve
out-of-order handling for this case, but it's rather complex and there
is no timeline yet.


Btw: Confluent ships vanilla Kafka Streams and there is no difference
between both.


-Matthias

On 7/15/20 3:42 AM, Dumitru-Nicolae Marasoui wrote:
> Hello kafka community,
> Hi, in KTable-KTable Join document from an older version, the cwiki
> mentions:
> “Pay attention, that the KTable lookup is done on the current KTable state,
> and thus, out-of-order records can yield non-deterministic result.
> Furthermore, in practice Kafka Streams does not guarantee that all records
> will be processed in timestamp order (even if processing records in
> timestamp order is the goal, it is only best effort).“. Is still valid?
> What does it mean? Is this only about a temporary glitch in emitted data,
> or in the eventual result of the data (assuming the output is written in
> compacted topics)? can we expect things to become eventually consistent and
> present in that eventual state of a compacted topic output?
> What kinds of inconsistencies or data loss in terms of the join output can
> we expect if any? Are all the joined records going to be outputed
> eventually? Or is there a possibility of race condition where with the
> default ktable join it is possible that concurrent processing of messages
> would cause the pair to never be emitted? for instance two messages arrive
> in the t1 and t2 topics; they are concurrently processed and joined with
> the local state of the other which does not contain its pair; at this stage
> it seems data loss is possible & permanent; but does kafka streams take
> into account when replicating the change log for instance and detect that a
> pair has been added on another node, and emit that pair?
> Is this join commutative (sort of semi-group, like CRDTs), so that any
> concurrency in processing results in a consistent eventual state?
> Thank you
> 
> 
> On Tue, 14 Jul 2020 at 13:08, Dumitru-Nicolae Marasoui <
> Nicolae.Marasoiu@kaluza.com> wrote:
> 
>> In
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin.1 it
>> is mentioned that "Pay attention, that the KTable lookup is done on the
>> *current* KTable state, and thus, out-of-order records can yield
>> non-deterministic result. Furthermore, in practice Kafka Streams does not
>> guarantee that all records will be processed in timestamp order (even if
>> processing records in timestamp order is the goal, it is only best effort)."
>>
>> Is this still a valid concern?
>> Can you give a few examples on how this may happen and how the end result
>> would look like? I guess non determinism in this case means that the end
>> result (the eventual result) can be one of many possible combinations?
>>
>> Which version(s) of kafka streams have this concern? All of them, right?
>> (is there any difference between open source & confluent versions?)
>>
>> Thank you
>>
>> --
>>
>> Dumitru-Nicolae Marasoui
>>
>> Software Engineer
>>
>>
>>
>> w kaluza.com <https://www.kaluza.com/>
>>
>> LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
>> <https://twitter.com/Kaluza_tech>
>>
>> Kaluza Ltd. registered in England and Wales No. 08785057
>>
>> VAT No. 100119879
>>
>> Help save paper - do you need to print this email?
>>
> 
> 


Re: is ktable-ktable join stil having the risk of non determinism?

Posted by Dumitru-Nicolae Marasoui <Ni...@kaluza.com>.
Hello kafka community,
Hi, in KTable-KTable Join document from an older version, the cwiki
mentions:
“Pay attention, that the KTable lookup is done on the current KTable state,
and thus, out-of-order records can yield non-deterministic result.
Furthermore, in practice Kafka Streams does not guarantee that all records
will be processed in timestamp order (even if processing records in
timestamp order is the goal, it is only best effort).“. Is still valid?
What does it mean? Is this only about a temporary glitch in emitted data,
or in the eventual result of the data (assuming the output is written in
compacted topics)? can we expect things to become eventually consistent and
present in that eventual state of a compacted topic output?
What kinds of inconsistencies or data loss in terms of the join output can
we expect if any? Are all the joined records going to be outputed
eventually? Or is there a possibility of race condition where with the
default ktable join it is possible that concurrent processing of messages
would cause the pair to never be emitted? for instance two messages arrive
in the t1 and t2 topics; they are concurrently processed and joined with
the local state of the other which does not contain its pair; at this stage
it seems data loss is possible & permanent; but does kafka streams take
into account when replicating the change log for instance and detect that a
pair has been added on another node, and emit that pair?
Is this join commutative (sort of semi-group, like CRDTs), so that any
concurrency in processing results in a consistent eventual state?
Thank you


On Tue, 14 Jul 2020 at 13:08, Dumitru-Nicolae Marasoui <
Nicolae.Marasoiu@kaluza.com> wrote:

> In
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin.1 it
> is mentioned that "Pay attention, that the KTable lookup is done on the
> *current* KTable state, and thus, out-of-order records can yield
> non-deterministic result. Furthermore, in practice Kafka Streams does not
> guarantee that all records will be processed in timestamp order (even if
> processing records in timestamp order is the goal, it is only best effort)."
>
> Is this still a valid concern?
> Can you give a few examples on how this may happen and how the end result
> would look like? I guess non determinism in this case means that the end
> result (the eventual result) can be one of many possible combinations?
>
> Which version(s) of kafka streams have this concern? All of them, right?
> (is there any difference between open source & confluent versions?)
>
> Thank you
>
> --
>
> Dumitru-Nicolae Marasoui
>
> Software Engineer
>
>
>
> w kaluza.com <https://www.kaluza.com/>
>
> LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
> <https://twitter.com/Kaluza_tech>
>
> Kaluza Ltd. registered in England and Wales No. 08785057
>
> VAT No. 100119879
>
> Help save paper - do you need to print this email?
>


-- 

Dumitru-Nicolae Marasoui

Software Engineer



w kaluza.com <https://www.kaluza.com/>

LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
<https://twitter.com/Kaluza_tech>

Kaluza Ltd. registered in England and Wales No. 08785057

VAT No. 100119879

Help save paper - do you need to print this email?