You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alexander Smirnov (Jira)" <ji...@apache.org> on 2022/04/26 08:36:00 UTC

[jira] [Created] (FLINK-27411) Move lookup table source cache logic to flink-table-runtime module

Alexander Smirnov created FLINK-27411:
-----------------------------------------

             Summary: Move lookup table source cache logic to flink-table-runtime module
                 Key: FLINK-27411
                 URL: https://issues.apache.org/jira/browse/FLINK-27411
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / HBase, Connectors / JDBC, Table SQL / API, Table SQL / Planner, Table SQL / Runtime
    Affects Versions: 1.16.0
            Reporter: Alexander Smirnov
         Attachments: LookupJoin(2).png

The idea was inspired by FLIP [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=]. [~renqs] and Yuan Zhu have done great work on this. But I suggest to implement it in a slightly different way, that will allow applying optimizations to caching + requires less dependencies to connectors.

The point is to move logic of caching to a lower level - to the level of flink-table-runtime operators. Architecture of lookup join looks like this (red text - schematic representation of proposed changes):
!LookupJoin(2).png|width=589,height=264!

LookupConfig is named like this (not CacheConfig) because it can also contains non-cache options for lookup join (for example, 'lookup.max-retries', 'lookup.async'...).

Changes in connectors - remove their own logic for configs, caching, retrying queries.
Changes is public "Table SQL / API" - new class LookupConfig, new ConfigOptions for lookup connectors and new method 'getLookupConfig' in LookupTableSource.
{code:java}
@PublicEvolving
public interface LookupTableSource extends DynamicTableSource {
     ...

    /** @return configurations for planning lookup join and executing it in runtime. */
    default LookupConfig getLookupConfig() {
        return null;
    } 

   ...
}{code}
Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his inheritors.
Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern here to avoid code duplication and a large number of classes, but in our private version design is like this (maybe not so elegant).

With such architecture we can apply further optimizations to caching: 
1) Caching after calculations. LookupJoinRunnerWithCalc + AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function contains calculations on fields of lookup table, and most of the time these calculations are filters and projections. 
For example, if we have join table A with lookup table B using condition ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ function will contain filters 'A.age = B.age + 10 and B.salary > 1000'.  

If we apply this function before storing records in cache, size of cache will be significantly reduced: filters = avoid storing useless records in cache, projections = reduce records’ size. So the initial max number of records in cache can be increased by the user.

2) Constant keys optimization. If join condition contains constants, for example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store '10' in cache. Currently TableFunction's 'eval' method is called with values 'A.name' and 10, so we store '10' in cache for every row, which is pretty useless.

Notice, that in this change I didn't mention Hive lookup connector, because it stores all data in memory. This logic can be replaced in future by 'ALL' cache strategy, that was mentioned in original FLIP.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)