You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2020/02/11 19:12:00 UTC

[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

    [ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034746#comment-17034746 ] 

Sophie Blee-Goldman commented on KAFKA-8037:
--------------------------------------------

[~pkleindl] [~vvcephei] [~mjsax] [~guozhang] I have kind of a different approach that might be a good compromise. We want to satisfy all of the following three goals as much as possible:
 # reuse the input topic as the changelog to avoid replicating all that data
 # make restoration as fast as possible by copying plain bytes with deserializing
 # don't load bad data, or copy bytes during restoration that weren't copied during normal processing

As this ticket points out, we currently sacrifice 3) for the sake of 1) and 2) with the workaround being to give up 1) by turning off optimization. But what if we instead created a new type of internal topic that's essentially an "inverse-changelog" – rather than sending every record that goes to the store to this inverse-changelog, we send only the records that _don't_ get put into the store? In fact we don't even store the entire record, just the key bytes with a null value. Then once we've restored from the input-topic-changelog, we read from the inverse-changelog and any bad records get deleted without ever having to deserialize or store the value twice.

My one concern is over the tracking/handling of local stream time: if we load the bad data during the initial restoration, it might bump up the stream time when it shouldn't have and potentially cause older, valid records to get dropped. If that's a real concern, then this approach would be blocked on KAFKA-9368 – however, I'm not confident that corrupted records don't currently bump the stream-time even during normal processing, and also not sure what kind of guarantees we should or do make w.r.t deserialization exceptions. 

Thoughts?

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a `deserialization.exception.handler` to drop corrupted records on read. However, this mechanism may be by-passed on restore. Assume a `builder.table()` call reads and drops a corrupted record. If the table state is lost and restored from the changelog topic, the corrupted record may be copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the record would fail with a deserialization exception if the value part cannot be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for GlobalKTable case). It's unclear to me atm, how this issue could be addressed for KTables though.
> Note, that user state stores are not affected, because they always have a dedicated changelog topic (and don't reuse an input topic) and thus the corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)