You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Venkata krishnan Sowrirajan (Jira)" <ji...@apache.org> on 2024/01/18 18:50:00 UTC

[jira] [Comment Edited] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

    [ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808343#comment-17808343 ] 

Venkata krishnan Sowrirajan edited comment on FLINK-32940 at 1/18/24 6:49 PM:
------------------------------------------------------------------------------

[~jeyhun] Thanks for the detailed response. Sorry for the delayed response.

Yes, you're correct that _CoreRules.ProjectCorrelateTransposeRule_ is not enough and it results in the above error you mentioned and even after resolving it by other means it still won't work as you described above.

I was actually taking the approach of extending the _CoreRules.ProjectCorrelateTransposeRule_ and overriding the _onMatch_ method. The issue I was facing there was Calcite doesn't support expressing nested fields on a collection type (Map or Array) for eg: arr.a.b where arr is an Array type. Nested fields are typically represented through _RexFieldAccess_ but it is not supported on a collection type (Array) holding a bunch of {_}structs{_}. I was trying to extend the _RexFieldAccess_ internally with in Flink that handles the collection case. The new _RexFieldAccess_ (say {_}FlinkRexFieldAccess{_}) would be used in the new rule to push the nested projections on the collection down.

 

Additionally, it also requires the above adjustments you mentioned to be done to make sure there are no dangling references once the plan is rewritten. Let me know your thoughts. Thanks!

 

I got pulled in to other important things for the time being and not able to return to this problem. Hopefully, I will get some spare cycles in the coming weeks to address this issue. 


was (Author: vsowrirajan):
[~jeyhun] Thanks for the detailed response. Sorry for the delayed response.

Yes, you're correct that _CoreRules.ProjectCorrelateTransposeRule_ is not enough and I results in the above error you mentioned and even after resolving it by other means it still won't work.


I was actually taking the approach of extending the _CoreRules.ProjectCorrelateTransposeRule_ and overriding the _onMatch_ method. The issue I was facing there was Calcite doesn't support expressing nested fields on a collection type (Map or Array) for eg: arr.a.b where arr is an Array type. Nested fields are typically represented through _RexFieldAccess_ but it is not supported on a collection type (Array) holding a bunch of {_}structs{_}. I was trying to extend the _RexFieldAccess_ internally with in Flink that handles the collection case. The new _RexFieldAccess_ (say {_}FlinkRexFieldAccess{_}) would be used in the new rule to push the nested projections on the collection down.

 

Additionally, it also requires the above adjustments to be done to make sure there are no dangling references once the plan is rewritten. Let me know your thoughts. Thanks!

 

I got pulled in to other important things for the time being and not able to return to this problem. Hopefully, I will get some spare cycles in the coming weeks to address this issue. 

> Support projection pushdown to table source for column projections through UDTF
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-32940
>                 URL: https://issues.apache.org/jira/browse/FLINK-32940
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than only _name_ and {_}employees{_}. If the table source supports nested fields column projection, ideally it should project only _t1.employees.ename_ from the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], correlate=[table($UNNEST_ROWS$1($cor1.employees))], select=[deptno,name,skillrecord,employees,empno,ename,skills], rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) skillrecord, RecordType:peek_no_expand(BIGINT empno, VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT empno, VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], correlate=[table($UNNEST_ROWS$1($cor1.employees))], select=[deptno,name,skillrecord,employees,empno,ename,skills], rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) skillrecord, RecordType:peek_no_expand(BIGINT empno, VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT empno, VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)