You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/04/02 22:39:00 UTC

[jira] [Updated] (FLINK-22955) lookup join filter push down result to mismatch function signature

     [ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-22955:
-----------------------------------
    Labels: auto-deprioritized-critical stale-major  (was: auto-deprioritized-critical)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> lookup join filter push down result to mismatch function signature
> ------------------------------------------------------------------
>
>                 Key: FLINK-22955
>                 URL: https://issues.apache.org/jira/browse/FLINK-22955
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.3, 1.13.1, 1.12.4
>         Environment: Flink 1.13.1
> how to reproduce: patch file attached
>            Reporter: Cooper Luan
>            Priority: Major
>              Labels: auto-deprioritized-critical, stale-major
>         Attachments: 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
>    +- Calc(select=[a, b])
>       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>    +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>    +- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.20.1#820001)