You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by iasiuide <ia...@163.com> on 2024/03/08 02:54:19 UTC
flink sql关联维表在lookup执行计划中的关联条件问题
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
[1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
+- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
+- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
+- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
+- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
+- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
+- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
....
为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题
Posted by iasiuide <ia...@163.com>.
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的
在 2024-03-08 11:08:51,"Yu Chen" <yu...@gmail.com> 写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
>> 2024年3月8日 11:02,iasiuide <ia...@163.com> 写道:
>>
>>
>>
>>
>> 图片可能加载不出来,下面是图片中的sql片段
>> ......
>> END AS trans_type,
>>
>> a.div_fee_amt,
>>
>> a.ts
>>
>> FROM
>>
>> ods_ymfz_prod_sys_divide_order a
>>
>> LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>>
>> AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
>>
>> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS c ON b.member_id = c.pk_id
>>
>> AND c.data_source = 'merch'
>>
>> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d ON c.agent_id = d.pk_id
>>
>> AND (
>>
>> d.data_source = 'ex_agent'
>>
>> OR d.data_source = 'agent'
>>
>> )
>>
>> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d1 ON d.fagent_id = d1.pk_id
>>
>> AND d1.data_source = 'agent'
>>
>> WHERE
>>
>> a.order_state = '2'
>>
>> AND a.divide_fee_amt > 0
>>
>> ) dat
>>
>> WHERE
>>
>> trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyy-MM-dd')
>>
>> AND CHAR_LENGTH(member_id) > 1;
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
>>
>>
>>
>>
>>
>> 下面的sql片段中
>> ods_ymfz_prod_sys_divide_order 为kafka source表
>> dim_ymfz_prod_sys_trans_log 为mysql为表
>> dim_ptfz_ymfz_merchant_info 为mysql为表
>>
>>
>>
>> flink web ui界面的执行计划片段如下:
>>
>> [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
>> +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
>> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
>> +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
>> +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
>> +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
>> +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
>> ....
>>
>>
>> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>> 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
>>
>>
>>
>>
>>
>
Re: flink sql关联维表在lookup执行计划中的关联条件问题
Posted by Yu Chen <yu...@gmail.com>.
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
[1] https://issues.apache.org/jira/browse/FLINK-33365
祝好~
> 2024年3月8日 11:02,iasiuide <ia...@163.com> 写道:
>
>
>
>
> 图片可能加载不出来,下面是图片中的sql片段
> ......
> END AS trans_type,
>
> a.div_fee_amt,
>
> a.ts
>
> FROM
>
> ods_ymfz_prod_sys_divide_order a
>
> LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>
> AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS c ON b.member_id = c.pk_id
>
> AND c.data_source = 'merch'
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d ON c.agent_id = d.pk_id
>
> AND (
>
> d.data_source = 'ex_agent'
>
> OR d.data_source = 'agent'
>
> )
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d1 ON d.fagent_id = d1.pk_id
>
> AND d1.data_source = 'agent'
>
> WHERE
>
> a.order_state = '2'
>
> AND a.divide_fee_amt > 0
>
> ) dat
>
> WHERE
>
> trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyy-MM-dd')
>
> AND CHAR_LENGTH(member_id) > 1;
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
>
>
>
>
>
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order 为kafka source表
> dim_ymfz_prod_sys_trans_log 为mysql为表
> dim_ptfz_ymfz_merchant_info 为mysql为表
>
>
>
> flink web ui界面的执行计划片段如下:
>
> [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
> +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
> +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
> +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
> +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
> +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
> ....
>
>
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
>
>
>
>
>
Re:flink sql关联维表在lookup执行计划中的关联条件问题
Posted by iasiuide <ia...@163.com>.
图片可能加载不出来,下面是图片中的sql片段
......
END AS trans_type,
a.div_fee_amt,
a.ts
FROM
ods_ymfz_prod_sys_divide_order a
LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS c ON b.member_id = c.pk_id
AND c.data_source = 'merch'
LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d ON c.agent_id = d.pk_id
AND (
d.data_source = 'ex_agent'
OR d.data_source = 'agent'
)
LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d1 ON d.fagent_id = d1.pk_id
AND d1.data_source = 'agent'
WHERE
a.order_state = '2'
AND a.divide_fee_amt > 0
) dat
WHERE
trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyy-MM-dd')
AND CHAR_LENGTH(member_id) > 1;
在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
[1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
+- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
+- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
+- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
+- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
+- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
+- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
....
为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
Re: Re:flink sql关联维表在lookup执行计划中的关联条件问题
Posted by Jane Chan <qi...@gmail.com>.
Hi iasiuide,
感谢提问. 先来回答最后一个问题
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗?
>
Lookup join 的 on condition 会在优化过程中经过一系列改写, 这里只简要对影响 lookup 和 where 的几处进行说明.
1. logical 阶段, FlinkFilterJoinRule 会将 on 条件 split 为针对单边的 (左表/右表) 和针对双边的.
**针对单边的 filter 会被尽量 pushdown 到 join 节点之前** (这意味着有可能会额外生成一个 Filter 节点);
Filter 节点后续如何变化取决于这个 filter 能否 pushdown 到 source, 如果不能, 那么在 physical
阶段它就会变成维表上面 Calc 节点 (denoted by calcOnTemporalTable) 里面的 condition.
2. 在 CommonPhysicalLookupJoin 里解析 allLookupKeys 的时候, 会试图从
calcOnTemporalTable 里把常量条件抽取出来形成最终的 lookup key (也就是 explain plan 里面
lookup=[...] 的内容), 在 explain 时, 只要存在 calcOnTemporalTable, where=[...]
就会被打印出来.
回到具体的 case
为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>
因为 b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
是针对维表单边的条件且无法被下推. 另外, 这里使用了非确定性函数[1], 请关注结果的正确性.
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>
此时常量可以被提取出来
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>
据我所知 lookup 目前应该还不支持 SARGable
[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/
Best,
Jane
On Fri, Mar 8, 2024 at 11:19 AM iasiuide <ia...@163.com> wrote:
> 好的,已经贴了sql片段
>
> 在 2024-03-08 11:02:34,"Xuyang" <xy...@163.com> 写道:
> >Hi, 你的图挂了,可以用图床或者直接贴SQL
> >
> >
> >
> >
> >--
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
> >
> >
> >
> >
> >
> >下面的sql片段中
> >ods_ymfz_prod_sys_divide_order 为kafka source表
> >dim_ymfz_prod_sys_trans_log 为mysql为表
> >dim_ptfz_ymfz_merchant_info 为mysql为表
> >
> >
> >
> >flink web ui界面的执行计划片段如下:
> >
> > [1]:TableSourceScan(table=[[default_catalog, default_database,
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time),
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))),
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id,
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> >+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time,
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 *
> divide_fee_amt), divide_fee_amt) AS div_fee_amt,
> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time
> AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt
> > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS
> TIMESTAMP(9)), 'yyyy-MM-dd')))])
> > +-
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))],
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts,
> bg_rel_trans_id, pay_type, member_id, mer_name])
> > +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
> > +-
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source
> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, pk_id, agent_id, bagent_id])
> > +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, agent_id, bagent_id])
> > +-
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id],
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])],
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id,
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> > +- [8]:Calc(select=[sys_date, create_time, div_fee_amt,
> ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS
> fagent_id0])
> > +-
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0],
> where=[(data_source = 'agent')], select=[sys_date, create_time,
> div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0,
> fagent_id0, pk_id, agent_name, bagent_name])
> > ....
> >
> >
> >为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> >关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> >关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> >关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
> >
> >
> >
> >
> >
>
Re:Re:flink sql关联维表在lookup执行计划中的关联条件问题
Posted by iasiuide <ia...@163.com>.
好的,已经贴了sql片段
在 2024-03-08 11:02:34,"Xuyang" <xy...@163.com> 写道:
>Hi, 你的图挂了,可以用图床或者直接贴SQL
>
>
>
>
>--
>
> Best!
> Xuyang
>
>
>
>
>在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
>
>
>
>
>
>下面的sql片段中
>ods_ymfz_prod_sys_divide_order 为kafka source表
>dim_ymfz_prod_sys_trans_log 为mysql为表
>dim_ptfz_ymfz_merchant_info 为mysql为表
>
>
>
>flink web ui界面的执行计划片段如下:
>
> [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
> +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
> +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
> +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
> +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
> +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
> ....
>
>
>为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
>
>
>
>
>
Re:flink sql关联维表在lookup执行计划中的关联条件问题
Posted by Xuyang <xy...@163.com>.
Hi, 你的图挂了,可以用图床或者直接贴SQL
--
Best!
Xuyang
在 2024-03-08 10:54:19,"iasiuide" <ia...@163.com> 写道:
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
[1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3)))) AS ts], where=[((order_state = '2') AND (divide_fee_amt > 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyy-MM-dd')))])
+- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'yyyyMMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name])
+- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) > 1)])
+- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id])
+- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
+- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
+- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
....
为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。