You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Patrik Kleindl (JIRA)" <ji...@apache.org> on 2019/03/08 10:28:00 UTC

[jira] [Commented] (KAFKA-7663) Custom Processor supplied on addGlobalStore is not used when restoring state from topic

    [ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787760#comment-16787760 ] 

Patrik Kleindl commented on KAFKA-7663:
---------------------------------------

I took a short look at this issue while working on https://issues.apache.org/jira/browse/KAFKA-8037 because I added access to the sourceNode and it's deserializer there during GlobalStateManagerImpl.restoreState

If I try to call process in restoreState it fails because the topology has not been initialized yet.

This currently happens in GlobalStateUpdateTask.initialize but is only called after the restore has finished.

If I try to replicate this in GlobalStateManagerImpl.initialize by initializing before stateStore.init(processorContext, stateStore) then initializing the stateStore fails because it is not allowed after the topology has been initialized.

 

> Custom Processor supplied on addGlobalStore is not used when restoring state from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom processor responsible to transform a K,V record from the input stream into a V,K records. It works fine and my {{store.all()}} does print the correct persisted V,K records. However, if I clean the local store and restart the stream app, the global table is reloaded but without going through the processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores the input topic K,V records into rocksDB (hence bypassing the mapping function of my custom processor). I believe this must not be the expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic as a global table with some stateless transformations based on a "custom" processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)