You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/01/01 18:58:00 UTC

[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

    [ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006463#comment-17006463 ] 

Guozhang Wang commented on KAFKA-7663:
--------------------------------------

I thought about the opt-in optimization if we follow option 3), which basically needs to look into the Processor and check it is an instance of our internal processor which does no custom logic and just apply the upstream records to the state store. I felt it "might" work but without fully work it out I cannot say for sure --- if it did work I'm slightly leaning towards it mainly because it does not require API changes, as in option 1) we'd best remove the ProcessorSupplier in the param to completely forbid users trying to customize it.

If we want to fix it asap (which I think so, as we are touching on this piece right now anyways) then maybe we can try to see if the above opt-in can indeed work nicely first, and if not then maybe consider option 3).

> Custom Processor supplied on addGlobalStore is not used when restoring state from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom processor responsible to transform a K,V record from the input stream into a V,K records. It works fine and my {{store.all()}} does print the correct persisted V,K records. However, if I clean the local store and restart the stream app, the global table is reloaded but without going through the processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores the input topic K,V records into rocksDB (hence bypassing the mapping function of my custom processor). I believe this must not be the expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic as a global table with some stateless transformations based on a "custom" processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)