You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Eno Thereska (JIRA)" <ji...@apache.org> on 2017/05/24 16:17:04 UTC

[jira] [Commented] (KAFKA-5315) Streams exception w/ partially processed record corrupts state store

    [ https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155 ] 

Eno Thereska commented on KAFKA-5315:
-------------------------------------

[~mjsax] isn't this a fundamental problem that exactly-once and transactions are supposed to solve? Falls into the general category of "produce to a bunch of topics + state store" atomically. What am I missing?

> Streams exception w/ partially processed record corrupts state store
> --------------------------------------------------------------------
>
>                 Key: KAFKA-5315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Mathieu Fenniak
>            Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is inserted into the state store before the being forwarded to downstream processors, and may persist in the state store even if downstream processing fails due to an exception.  The persisted state store record may later affect any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable source, group by a field in the value, aggregate that adds up another field, output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to Kafka. (eg, a TimeoutException Failed to
> update metadata after 60000 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the task.
> 7. New thread initializes its state store for the KTable, but it's on the same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before record A.
> 9. When processing record A, the new thread reads the value that was written to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record A, resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)