You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Xingbo Huang <hx...@gmail.com> on 2021/03/01 01:54:49 UTC
Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi,
差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
Best
Xingbo
xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> '20170307'"
> # 获取Query结果
> query_table = env.sql_query(sql)
> query_table.to_pandas()
> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>
>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by xiaoyue <18...@163.com>.
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>
Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
不好意思回复这么晚。关于pandas
udaf,我有专门测试过框架层的开销(函数用普通的均值计算)。和java相比,差距也就3,4倍左右,具体可以参考代码[1]。关于你这个代码,我怀疑是因为你函数实现的问题。你这个函数构造df是会有额外的开销。你为啥不直接使用j来进行计算。当然了,你也可以根据调整一些参数来提高性能,比如python.fn-execution.bundle.size和python.fn-execution.bundle.time,具体可以参考文档[2]。
[1]
https://github.com/HuangXingBo/pyflink-performance-demo/blob/master/python/flink/flink-pandas-udaf-test.py
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-fn-execution-bundle-size
Best,
Xingbo
xiaoyue@ysstech.com <xi...@ysstech.com> 于2021年3月2日周二 下午1:38写道:
> Hi,
> 是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
> python版:
> # 建立环境(udaf仅支持批环境)
> env_settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> env = BatchTableEnvironment.create(environment_settings=env_settings)
> # 表1 1千万行
> source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID
> VARCHAR(8),IS_EXCH_DAY DECIMAL
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = 'xxx',
> 'table-name' = 'TP_GL_DAY')
> """
> #表2 700多行
> source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
> SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
> CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = 'xxx',
> 'table-name' = 'TS_PF_SEC_YLDRATE')
> """
> # sink
> print_sink_ddl = """
> CREATE TABLE print(
> pf_id VARCHAR(10),
> out_put FLOAT
> ) WITH (
> 'connector' = 'print'
> )
> """
> # 源表
> env.execute_sql(source_ddl1)
> env.execute_sql(source_ddl2)
> # sink
> env.execute_sql(print_sink_ddl)
>
> sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN
> TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID =
> '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"
>
> # 获取Query结果
> query_table = env.sql_query(sql)
> # 执行udaf
> # udaf 聚合函数计算
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def logReturn(i, j):
> df = pd.DataFrame({'pf_id': i, 'yldrate': j})
> df['yldrate1'] = df['yldrate'] + 1
> return np.prod(df['yldrate1']) - 1
> # 执行并打印
> result =
> query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
> logReturn(
> query_table.PF_ID,
>
> query_table.YLDRATE)).execute_insert('print').wait()
>
> Java版本:
> Java选用的环境是流环境:
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings streamSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, streamSettings);
> streamEnv.execute("");
> 计算部分:
> java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
> tableEnv.registerFunction("add", new addFunction());
> tableEnv.registerFunction("prod", new ProductUdaf());
> Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as
> yldrate FROM queryData");
> tableEnv.createTemporaryView("addedTable", addedTable);
> Table resultTable = tableEnv.sqlQuery("SELECT
> pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");
>
> 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10
> 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.
>
>
> xiaoyue@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 11:59
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
>
> 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
>
> Best,
> Xingbo
>
> xiaoyue@ysstech.com <xi...@ysstech.com> 于2021年3月2日周二 上午9:57写道:
>
> > Hi,
> > 我是用的flink1.12的pandas类型的udaf, 代码如下:
> > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> > def logReturn(i, j):
> > df = pd.DataFrame({'id': i, 'rate': j})
> > df['rate1'] = df['rate'] + 1
> > return numpy.prod(df['rate1']) - 1
> > 调用方式为:
> > result =
> > query_table.group_by(query_table.PF_ID).select(query_table.ID,
> > logReturn(
> > query_table.ID,
> >
> > query_table.RATE)).execute_insert('print').wait()
> > 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> > java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> > 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> > 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> > 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
> >
> >
> >
> > xiaoyue@ysstech.com
> >
> > 发件人: Xingbo Huang
> > 发送时间: 2021-03-02 09:42
> > 收件人: user-zh
> > 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> > Hi,
> >
> > 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
> >
> >
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
> >
> > Best,
> > Xingbo
> >
> > xiaoyue <18...@163.com> 于2021年3月1日周一 上午10:34写道:
> >
> > > Hi, Xingbo
> > > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> > >
> > > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> > numpy中的矩阵计算,非常感谢~!
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
> > > >Hi,
> > > >
> > >
> > >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > > >
> > > >Best
> > > >Xingbo
> > > >
> > > >xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> > > >
> > > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > > source1.ID
> > > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> > > >> '20170307'"
> > > >> # 获取Query结果
> > > >> query_table = env.sql_query(sql)
> > > >> query_table.to_pandas()
> > > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > > >>
> > > >>
> > >
> >
>
Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by "xiaoyue@ysstech.com" <xi...@ysstech.com>.
Hi,
是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
python版:
# 建立环境(udaf仅支持批环境)
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
env = BatchTableEnvironment.create(environment_settings=env_settings)
# 表1 1千万行
source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID VARCHAR(8),IS_EXCH_DAY DECIMAL
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'xxx',
'table-name' = 'TP_GL_DAY')
"""
#表2 700多行
source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'xxx',
'table-name' = 'TS_PF_SEC_YLDRATE')
"""
# sink
print_sink_ddl = """
CREATE TABLE print(
pf_id VARCHAR(10),
out_put FLOAT
) WITH (
'connector' = 'print'
)
"""
# 源表
env.execute_sql(source_ddl1)
env.execute_sql(source_ddl2)
# sink
env.execute_sql(print_sink_ddl)
sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"
# 获取Query结果
query_table = env.sql_query(sql)
# 执行udaf
# udaf 聚合函数计算
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def logReturn(i, j):
df = pd.DataFrame({'pf_id': i, 'yldrate': j})
df['yldrate1'] = df['yldrate'] + 1
return np.prod(df['yldrate1']) - 1
# 执行并打印
result = query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
logReturn(
query_table.PF_ID,
query_table.YLDRATE)).execute_insert('print').wait()
Java版本:
Java选用的环境是流环境:
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
streamEnv.execute("");
计算部分:
java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
tableEnv.registerFunction("add", new addFunction());
tableEnv.registerFunction("prod", new ProductUdaf());
Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as yldrate FROM queryData");
tableEnv.createTemporaryView("addedTable", addedTable);
Table resultTable = tableEnv.sqlQuery("SELECT pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");
因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.
xiaoyue@ysstech.com
发件人: Xingbo Huang
发送时间: 2021-03-02 11:59
收件人: user-zh
主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi,
首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
Best,
Xingbo
xiaoyue@ysstech.com <xi...@ysstech.com> 于2021年3月2日周二 上午9:57写道:
> Hi,
> 我是用的flink1.12的pandas类型的udaf, 代码如下:
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def logReturn(i, j):
> df = pd.DataFrame({'id': i, 'rate': j})
> df['rate1'] = df['rate'] + 1
> return numpy.prod(df['rate1']) - 1
> 调用方式为:
> result =
> query_table.group_by(query_table.PF_ID).select(query_table.ID,
> logReturn(
> query_table.ID,
>
> query_table.RATE)).execute_insert('print').wait()
> 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
>
>
>
> xiaoyue@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 09:42
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
>
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
>
> Best,
> Xingbo
>
> xiaoyue <18...@163.com> 于2021年3月1日周一 上午10:34写道:
>
> > Hi, Xingbo
> > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> >
> > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> numpy中的矩阵计算,非常感谢~!
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
> > >Hi,
> > >
> >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > >
> > >Best
> > >Xingbo
> > >
> > >xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> > >
> > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > source1.ID
> > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> > >> '20170307'"
> > >> # 获取Query结果
> > >> query_table = env.sql_query(sql)
> > >> query_table.to_pandas()
> > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > >>
> > >>
> >
>
Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?
Best,
Xingbo
xiaoyue@ysstech.com <xi...@ysstech.com> 于2021年3月2日周二 上午9:57写道:
> Hi,
> 我是用的flink1.12的pandas类型的udaf, 代码如下:
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def logReturn(i, j):
> df = pd.DataFrame({'id': i, 'rate': j})
> df['rate1'] = df['rate'] + 1
> return numpy.prod(df['rate1']) - 1
> 调用方式为:
> result =
> query_table.group_by(query_table.PF_ID).select(query_table.ID,
> logReturn(
> query_table.ID,
>
> query_table.RATE)).execute_insert('print').wait()
> 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
>
>
>
> xiaoyue@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 09:42
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
>
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
>
> Best,
> Xingbo
>
> xiaoyue <18...@163.com> 于2021年3月1日周一 上午10:34写道:
>
> > Hi, Xingbo
> > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> >
> > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> numpy中的矩阵计算,非常感谢~!
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
> > >Hi,
> > >
> >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > >
> > >Best
> > >Xingbo
> > >
> > >xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> > >
> > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > source1.ID
> > >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> > >> '20170307'"
> > >> # 获取Query结果
> > >> query_table = env.sql_query(sql)
> > >> query_table.to_pandas()
> > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > >>
> > >>
> >
>
Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by "xiaoyue@ysstech.com" <xi...@ysstech.com>.
Hi,
我是用的flink1.12的pandas类型的udaf, 代码如下:
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def logReturn(i, j):
df = pd.DataFrame({'id': i, 'rate': j})
df['rate1'] = df['rate'] + 1
return numpy.prod(df['rate1']) - 1
调用方式为:
result = query_table.group_by(query_table.PF_ID).select(query_table.ID,
logReturn(
query_table.ID,
query_table.RATE)).execute_insert('print').wait()
这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
xiaoyue@ysstech.com
发件人: Xingbo Huang
发送时间: 2021-03-02 09:42
收件人: user-zh
主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi,
你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
Best,
Xingbo
xiaoyue <18...@163.com> 于2021年3月1日周一 上午10:34写道:
> Hi, Xingbo
> 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
>
> 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
>
>
>
>
>
>
>
> 在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
>
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> >
> >Best
> >Xingbo
> >
> >xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> >
> >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> source1.ID
> >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> >> '20170307'"
> >> # 获取Query结果
> >> query_table = env.sql_query(sql)
> >> query_table.to_pandas()
> >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> >>
> >>
>
Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
Best,
Xingbo
xiaoyue <18...@163.com> 于2021年3月1日周一 上午10:34写道:
> Hi, Xingbo
> 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
>
> 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
>
>
>
>
>
>
>
> 在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
>
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> >
> >Best
> >Xingbo
> >
> >xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
> >
> >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> source1.ID
> >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
> >> '20170307'"
> >> # 获取Query结果
> >> query_table = env.sql_query(sql)
> >> query_table.to_pandas()
> >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> >>
> >>
>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by xiaoyue <18...@163.com>.
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Posted by xiaoyue <18...@163.com>.
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
在 2021-03-01 09:54:49,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>