You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Nico Habermann (Jira)" <ji...@apache.org> on 2021/03/19 05:08:00 UTC

[jira] [Updated] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once

     [ https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nico Habermann updated KAFKA-12508:
-----------------------------------
    Description: 
[KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] added emit-on-change semantics to KTables that suppress updates for duplicate values.

However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc.

 

Consider the following example:
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
 
 # Record A gets read
 # Record A is stored in the table
 # The update for record A is forwarded through the topology
 # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic)
 # The stream is restarted and "retries" from the last commit
 # Record A gets read again
 # The table will discard the update for record A because
 ## The value is the same
 ## The timestamp is the same
 # Eventually the stream will commit
 # There is absolutely no output for Record A even though we're running in at_least_once

 

This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp.

  was:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]

Added emit-on-change semantics to KTables that suppress duplicate values.

However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc.

 

Consider the following example:

 
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
 

 
 # Record A gets read
 # Record A is stored in the table
 # The update for record A is forwarded through the topology
 # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic)
 # The stream is restarted and "retries" from the last commit
 # Record A gets read again
 # The table will discard the update for record A because
 ## The value is the same
 ## The timestamp is the same
 # Eventually the stream will commit
 # There is absolutely no output for Record A even though we're running in at_least_once

 

This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp.


> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-12508
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12508
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0, 2.6.1
>            Reporter: Nico Habermann
>            Priority: Major
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] added emit-on-change semantics to KTables that suppress updates for duplicate values.
> However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc.
>  
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>  
>  # Record A gets read
>  # Record A is stored in the table
>  # The update for record A is forwarded through the topology
>  # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic)
>  # The stream is restarted and "retries" from the last commit
>  # Record A gets read again
>  # The table will discard the update for record A because
>  ## The value is the same
>  ## The timestamp is the same
>  # Eventually the stream will commit
>  # There is absolutely no output for Record A even though we're running in at_least_once
>  
> This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)