You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2021/08/06 09:55:00 UTC

[jira] [Created] (FLINK-23663) Reduce state size in ChangelogNormalize through filter push down

Timo Walther created FLINK-23663:
------------------------------------

             Summary: Reduce state size in ChangelogNormalize through filter push down
                 Key: FLINK-23663
                 URL: https://issues.apache.org/jira/browse/FLINK-23663
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Timo Walther


{{ChangelogNormalize}} is an expensive stateful operation as it stores data for each key. 

Filters are generally not pushed through a ChangelogNormalize node which means that users have no possibility to at least limit the key space. Pushing filters like {{a < 10}} into a source like {{upsert-kafka}} that is emitting {{+I[key1, a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will be filtered and leads to wrong results. But limiting the filter push down to key space should be safe.

Furthermore, it seems the current implementation is also wrong as it pushes filters through {{ChangelogNormalize}} but only if the source implements filter push down.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)