You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Cooper Luan (Jira)" <ji...@apache.org> on 2021/06/10 04:00:00 UTC

[jira] [Created] (FLINK-22955) lookup join filter push down result to mismatch function signature

Cooper Luan created FLINK-22955:
-----------------------------------

             Summary: lookup join filter push down result to mismatch function signature
                 Key: FLINK-22955
                 URL: https://issues.apache.org/jira/browse/FLINK-22955
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.4, 1.13.1, 1.11.3
         Environment: Flink 1.13.1

how to reproduce: patch file attached
            Reporter: Cooper Luan
         Attachments: 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch

a sql like this may result to look function signature mismatch exception when explain sql
{code:sql}
CREATE TEMPORARY VIEW v_vvv AS
SELECT * FROM MyTable AS T
JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.a = D.id;

SELECT a,b,id,name
FROM v_vvv
WHERE age = 10;{code}
the lookup function is
{code:scala}
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
  }
}{code}
exec plan is
{code:java}
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
   +- Calc(select=[a, b])
      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
{code}
the "lookup=[age=10, id=a]" result to mismatch signature mismatch

 

but if I add 1 more insert, it works well
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
exec plan is
{code:java}
== Optimized Execution Plan ==
LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, ts])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 10)])
   +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 30)])
   +- Reused(reference_id=[1])

{code}
 the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" (wrong)

 

so, in "multi insert" case, planner works great

in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)