You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/01/26 11:42:00 UTC

[jira] [Commented] (FLINK-21145) Flink Temporal Join Hive optimization

    [ https://issues.apache.org/jira/browse/FLINK-21145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272052#comment-17272052 ] 

Jark Wu commented on FLINK-21145:
---------------------------------

I don't think we should provide such filter connector option. But this can be done by supporting filter pushdown for the lookup table source, i.e. FLINK-18779.
For example, if you have a temporal join query 

{code}
SELECT * FROM T1 JOIN hive_dim FOR SYSTEM_TIME AS OF T1.proctime
ON T1.id = hive_dim.id
WHERE hive_dim.f1 > 1000;
{code}

Then the filter {{hive_dim.f1 > 1000}} will be pushed down into the lookup source of Hive, that means the cached hive partition data is filtered. 

If the filter pushdown optimization can address your problem, I will close this issue as a duplication of FLINK-18779.

> Flink Temporal Join Hive optimization
> -------------------------------------
>
>                 Key: FLINK-21145
>                 URL: https://issues.apache.org/jira/browse/FLINK-21145
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Hive
>    Affects Versions: 1.12.0
>            Reporter: HideOnBush
>            Priority: Major
>
> When flink temporal join hive dimension table, the latest partition data will be loaded into task memory in full, which will lead to high memory overhead. In fact, sometimes the latest full data is not required. You can add options like options in future versions. Is the dimension table data filtered?
> For example, select * from dim /*'streaming-source.partition.include' ='latest' condition='fild1=ab'*/ filter the latest partition data as long as fild1=ab



--
This message was sent by Atlassian Jira
(v8.3.4#803005)