You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/04/26 08:39:00 UTC

[jira] [Commented] (FLINK-27411) Move lookup table source cache logic to flink-table-runtime module

    [ https://issues.apache.org/jira/browse/FLINK-27411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528011#comment-17528011 ] 

Martijn Visser commented on FLINK-27411:
----------------------------------------

[~smiralex] Thanks for opening this ticket. The concept of a lookup is currently only known in the Table/SQL context; could we generalize such a solution so that it can also become available for DataStream users? 

> Move lookup table source cache logic to flink-table-runtime module
> ------------------------------------------------------------------
>
>                 Key: FLINK-27411
>                 URL: https://issues.apache.org/jira/browse/FLINK-27411
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase, Connectors / JDBC, Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: Alexander Smirnov
>            Priority: Major
>         Attachments: LookupJoin(2).png
>
>
> The idea was inspired by FLIP [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=]. [~renqs] and Yuan Zhu have done great work on this. But I suggest to implement it in a slightly different way, that will allow applying optimizations to caching + requires less dependencies to connectors.
> The point is to move logic of caching to a lower level - to the level of flink-table-runtime operators. Architecture of lookup join looks like this (red text - schematic representation of proposed changes):
> !LookupJoin(2).png|width=589,height=264!
> LookupConfig is named like this (not CacheConfig) because it can also contains non-cache options for lookup join (for example, 'lookup.max-retries', 'lookup.async'...).
> Changes in connectors - remove their own logic for configs, caching, retrying queries.
> Changes is public "Table SQL / API" - new class LookupConfig, new ConfigOptions for lookup connectors and new method 'getLookupConfig' in LookupTableSource.
> {code:java}
> @PublicEvolving
> public interface LookupTableSource extends DynamicTableSource {
>      ...
>     /** @return configurations for planning lookup join and executing it in runtime. */
>     default LookupConfig getLookupConfig() {
>         return null;
>     } 
>    ...
> }{code}
> Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his inheritors.
> Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern here to avoid code duplication and a large number of classes, but in our private version design is like this (maybe not so elegant).
> With such architecture we can apply further optimizations to caching: 
> 1) Caching after calculations. LookupJoinRunnerWithCalc + AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function contains calculations on fields of lookup table, and most of the time these calculations are filters and projections. 
> For example, if we have join table A with lookup table B using condition ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ function will contain filters 'A.age = B.age + 10 and B.salary > 1000'.  
> If we apply this function before storing records in cache, size of cache will be significantly reduced: filters = avoid storing useless records in cache, projections = reduce records’ size. So the initial max number of records in cache can be increased by the user.
> 2) Constant keys optimization. If join condition contains constants, for example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store '10' in cache. Currently TableFunction's 'eval' method is called with values 'A.name' and 10, so we store '10' in cache for every row, which is pretty useless.
> Notice, that in this change I didn't mention Hive lookup connector, because it stores all data in memory. This logic can be replaced in future by 'ALL' cache strategy, that was mentioned in original FLIP.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)