You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2022/08/26 09:40:00 UTC

[jira] [Created] (FLINK-29119) Should clarify how join hints work with CTE

lincoln lee created FLINK-29119:
-----------------------------------

             Summary: Should clarify how join hints work with CTE
                 Key: FLINK-29119
                 URL: https://issues.apache.org/jira/browse/FLINK-29119
             Project: Flink
          Issue Type: Improvement
            Reporter: lincoln lee


use source tables of flink-tpch-test

join hint on a single expression name of CTE works fine:

{code}

Flink SQL> explain with q1 as (SELECT
>   p_name,
>   p_mfgr,
>   p_brand,
>   p_type,
>   s_name,
>   s_address
> FROM
>   part,
>   supplier
> WHERE p_partkey = s_suppkey)
>
> SELECT /*+ SHUFFLE_MERGE(part,supplier)  */ * from q1;
== Abstract Syntax Tree ==
LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11])
+- LogicalFilter(condition=[=($0, $9)])
   +- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part, supplier]]]])
      :- LogicalTableScan(table=[[default_catalog, default_database, part]], hints=[[[ALIAS inheritPath:[] options:[part]]]])
      +- LogicalTableScan(table=[[default_catalog, default_database, supplier]], hints=[[[ALIAS inheritPath:[] options:[supplier]]]])

== Optimized Physical Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address])

== Optimized Execution Plan ==
Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
+- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address])
   :- Exchange(distribution=[hash[p_partkey]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
   +- Exchange(distribution=[hash[s_suppkey]])
      +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address])

{code}

but raise an error when there co-exists an alias of the expression name

{code}

Flink SQL> explain with q1 as (SELECT
>   p_name,
>   p_mfgr,
>   p_brand,
>   p_type,
>   s_name,
>   s_address
> FROM
>   part,
>   supplier
> WHERE p_partkey = s_suppkey)
>
> SELECT /*+ SHUFFLE_MERGE(part,supplier)  */ * from q1, q1 q2 where q1.p_name = q2.p_name;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The options of following hints cannot match the name of input tables or views:
`SHUFFLE_MERGE(part, supplier)`

{code}

The expected behavior with CTE should be clarified in the documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)