You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/03/04 21:28:00 UTC
[jira] [Updated] (KAFKA-7663) Custom Processor supplied on
addGlobalStore is not used when restoring state from topic
[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-7663:
-----------------------------------
Description:
I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom processor responsible to transform a K,V record from the input stream into a V,K records. It works fine and my {{store.all()}} does print the correct persisted V,K records. However, if I clean the local store and restart the stream app, the global table is reloaded but without going through the processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores the input topic K,V records into rocksDB (hence bypassing the mapping function of my custom processor). I believe this must not be the expected result?
This is a follow up on stackoverflow discussion around storing a K,V topic as a global table with some stateless transformations based on a "custom" processor added on the global store:
[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
If we address this issue, we should also apply `default.deserialization.exception.handler` during restore.
was:
I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom processor responsible to transform a K,V record from the input stream into a V,K records. It works fine and my {{store.all()}} does print the correct persisted V,K records. However, if I clean the local store and restart the stream app, the global table is reloaded but without going through the processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores the input topic K,V records into rocksDB (hence bypassing the mapping function of my custom processor). I believe this must not be the expected result?
this is a follow up on stackoverflow discussion around storing a K,V topic as a global table with some stateless transformations based on a "custom" processor added on the global store:
[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> Custom Processor supplied on addGlobalStore is not used when restoring state from topic
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-7663
> URL: https://issues.apache.org/jira/browse/KAFKA-7663
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Frederic Tardif
> Priority: Major
> Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom processor responsible to transform a K,V record from the input stream into a V,K records. It works fine and my {{store.all()}} does print the correct persisted V,K records. However, if I clean the local store and restart the stream app, the global table is reloaded but without going through the processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores the input topic K,V records into rocksDB (hence bypassing the mapping function of my custom processor). I believe this must not be the expected result?
> This is a follow up on stackoverflow discussion around storing a K,V topic as a global table with some stateless transformations based on a "custom" processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply `default.deserialization.exception.handler` during restore.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)