You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Qishang <zh...@gmail.com> on 2021/03/02 10:50:02 UTC

UDF 重复调用的问题、

Hi 社区。
版本 : Flink 1.12.1
在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
e.g.
INSERT INTO table_a
SELECT
update_time,
    MD5(p_key) AS id,
    p_key
FROM
(
    SELECT
        LOCALTIMESTAMP AS update_time ,
        findkeyudf(p_name) AS p_key
    FROM table_b
) T
WHERE COALESCE(p_key, '')<> ''
;

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table_b]], fields=[p_name, xxx, ...])

Stage 2 : Operator
content : Calc(select=[CAST(()) AS update_date,
CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
ship_strategy : FORWARD

Stage 3 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[update_date, comp_name, p_key])
ship_strategy : FORWARD

查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。

现在有2个问题:
1. udf 调用不会被优化成一次,结果复用吗?
2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
)是执行了2次的。
3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?

Re: UDF 重复调用的问题、

Posted by Benchao Li <li...@apache.org>.
我没有搜到相关的issue,所以我先建了一个issue[1]。
这个优化相对来说影响比较大,需要仔细的设计和权衡,所以在社区推进的速度
可能没有办法保证,大家感兴趣的可以在issue里去讨论。

[1] https://issues.apache.org/jira/browse/FLINK-21573

Qishang <zh...@gmail.com> 于2021年3月3日周三 上午11:03写道:

> Hi Benchao.
>
> 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
> 这个 feature 社区有规划了吗?
>
>
> Benchao Li <li...@apache.org> 于2021年3月3日周三 上午10:23写道:
>
> > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
> >
> > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> > plan的过程中会将表达式完全展开,比如下面的SQL:
> > ```SQL
> > SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> > key3
> > FROM (
> >   SELECT dump_json_to_map(col1) as my_map
> >   FROM T
> > )
> > ```
> > 这种写法也会将`dump_json_to_map`这个函数执行3次。
> >
> > HunterXHunter <13...@qq.com> 于2021年3月3日周三 上午9:43写道:
> >
> > > 为什么4次是没问题的,感觉只执行一次才是最优的
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li

Re: UDF 重复调用的问题、

Posted by Qishang <zh...@gmail.com>.
Hi Benchao.

现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
这个 feature 社区有规划了吗?


Benchao Li <li...@apache.org> 于2021年3月3日周三 上午10:23写道:

> 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
>
> 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> plan的过程中会将表达式完全展开,比如下面的SQL:
> ```SQL
> SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> key3
> FROM (
>   SELECT dump_json_to_map(col1) as my_map
>   FROM T
> )
> ```
> 这种写法也会将`dump_json_to_map`这个函数执行3次。
>
> HunterXHunter <13...@qq.com> 于2021年3月3日周三 上午9:43写道:
>
> > 为什么4次是没问题的,感觉只执行一次才是最优的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>

Re: UDF 重复调用的问题、

Posted by Benchao Li <li...@apache.org>.
当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。

这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
plan的过程中会将表达式完全展开,比如下面的SQL:
```SQL
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
key3
FROM (
  SELECT dump_json_to_map(col1) as my_map
  FROM T
)
```
这种写法也会将`dump_json_to_map`这个函数执行3次。

HunterXHunter <13...@qq.com> 于2021年3月3日周三 上午9:43写道:

> 为什么4次是没问题的,感觉只执行一次才是最优的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li

Re: UDF 重复调用的问题、

Posted by HunterXHunter <13...@qq.com>.
为什么4次是没问题的,感觉只执行一次才是最优的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: UDF 重复调用的问题、

Posted by Qishang <zh...@gmail.com>.
2. 是我搞错了,是四次,没问题


Qishang <zh...@gmail.com> 于2021年3月2日周二 下午6:50写道:

> Hi 社区。
> 版本 : Flink 1.12.1
> 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
> e.g.
> INSERT INTO table_a
> SELECT
> update_time,
>     MD5(p_key) AS id,
>     p_key
> FROM
> (
>     SELECT
>         LOCALTIMESTAMP AS update_time ,
>         findkeyudf(p_name) AS p_key
>     FROM table_b
> ) T
> WHERE COALESCE(p_key, '')<> ''
> ;
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table_b]], fields=[p_name, xxx, ...])
>
> Stage 2 : Operator
> content : Calc(select=[CAST(()) AS update_date,
> CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
> where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
> ship_strategy : FORWARD
>
> Stage 3 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[update_date, comp_name, p_key])
> ship_strategy : FORWARD
>
> 查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。
>
> 现在有2个问题:
> 1. udf 调用不会被优化成一次,结果复用吗?
> 2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
> )是执行了2次的。
> 3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?
>
>
>