You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/06/19 00:43:00 UTC

[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

    [ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075 ] 

Matthias J. Sax edited comment on KAFKA-10179 at 6/19/20, 12:42 AM:
--------------------------------------------------------------------

{quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error.
{quote}
This is a known issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037


was (Author: mjsax):
{quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-10179
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10179
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Major
>             Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state store to the state store serdes. Currently, it always passes {{<application ID>-<store name>-changelog}} as the changelog topic name. However, for optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes actually use the topic name for (de)serialization, e.g., when Kafka Streams is used with Confluent's Schema Registry, a {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)