You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 洗你的头 <12...@qq.com> on 2020/10/26 15:04:14 UTC

回复: pyflink 如何正确设置高速度?(如何提速)

感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式

然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点


1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?


2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?


3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?


4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?

最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <hxbks2ks@gmail.com&gt;;
发送时间:&nbsp;2020年10月26日(星期一) 晚上8:47
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: pyflink 如何正确设置高速度?(如何提速)



Hi,

1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你

Best,
Xingbo

洗你的头 <1264386006@qq.com&gt; 于2020年10月26日周一 下午4:32写道:

&gt; 尊敬的开发者您好,
&gt; 我的需求是这样的,
&gt; 拥有数据:
&gt; 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
&gt; 需要做什么?
&gt; 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
&gt; 我现在的代码如下:
&gt; import&amp;nbsp;pandas as&amp;nbsp;pd
&gt; import&amp;nbsp;numpy as&amp;nbsp;np
&gt; from&amp;nbsp;pyflink.datastream import&amp;nbsp;StreamExecutionEnvironment
&gt; from&amp;nbsp;pyflink.table import&amp;nbsp;StreamTableEnvironment, DataTypes
&gt; from&amp;nbsp;pyflink.table.descriptors import&amp;nbsp;Schema, OldCsv, FileSystem
&gt; from&amp;nbsp;pyflink.table.udf import&amp;nbsp;udf
&gt; import&amp;nbsp;os
&gt; import&amp;nbsp;time
&gt; # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
&gt;
&gt; env =&amp;nbsp;StreamExecutionEnvironment.get_execution_environment()
&gt; env.set_parallelism(1)
&gt; t_env =&amp;nbsp;StreamTableEnvironment.create(env)
&gt; t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
&gt; '80m')
&gt; t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
&gt; '100000')
&gt; # 输出表创建
&gt; if&amp;nbsp;os.path.exists('output'):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; os.remove('output')
&gt;
&gt; t_env.connect(FileSystem().path('output')) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_format(OldCsv()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_schema(Schema()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .create_temporary_table('mySink')
&gt; # 交叉口经纬度数据读取
&gt; data =&amp;nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
&gt; coor_o =&amp;nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
&gt; data['O_Y'])))).T
&gt; coor_d =&amp;nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
&gt; data['D_Y'])))).T
&gt; coor =&amp;nbsp;coor_o.append(coor_d).drop_duplicates()
&gt; coor.columns =&amp;nbsp;['lng', 'lat']
&gt; coor =&amp;nbsp;coor.sort_index()
&gt; coor =&amp;nbsp;coor.to_numpy()
&gt; # udf编写与注册
&gt;
&gt;
&gt;
&gt; @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; DataTypes.ARRAY(DataTypes.FLOAT()),
&gt; DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
&gt; def&amp;nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; temp =&amp;nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance =&amp;nbsp;2*np.arctan2(np.sqrt(temp),
&gt; np.sqrt(1-temp))
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance =&amp;nbsp;distance*3958.8*1609.344
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; buffer=100
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; if&amp;nbsp;(distance <=&amp;nbsp;buffer).sum() &amp;gt;&amp;nbsp;0:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return&amp;nbsp;distance.argmin()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; else:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return&amp;nbsp;-1
&gt; # 出行起点数据读取
&gt;
&gt; df =&amp;nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
&gt; use_data =&amp;nbsp;df[['pickup_longitude', 'pickup_latitude']]
&gt; # 处理流程
&gt; t_env.from_pandas(use_data) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .select("distance_meters(pickup_longitude,
&gt; pickup_latitude)") \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .insert_into('mySink')
&gt; # 执行与计时
&gt;
&gt; start_time =&amp;nbsp;time.time()
&gt; t_env.execute("tutorial_job")
&gt; print(time.time() -&amp;nbsp;start_time)
&gt; 我电脑的CPU为12核24线程
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
&gt; 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
&gt; 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
&gt; 期待您的回答,感谢!

Re: pyflink 如何正确设置高速度?(如何提速)

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
>>>
1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
flink的table作业目前没法单独为每一个算子设置并发度,所以你设置并发度为8,就会输出8个文件。我觉得你这数据量不大,本质还是from_pandas的问题,你先把它换了,先用一个并发度玩就行。

>>> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
其实跑pandas
udf的模型是有一个java进程和对应一个Python进程,你的udf在Python进程跑着,数据从Java进程批量发送过去,一次发送多少数据是由python.fn-execution.bundle.size这个配置控制的,对于pandas
udf来说,因为需要把这个数据组织成pandas.series,所以还会有这个配置python.fn-execution.arrow.batch.size。举个例子就是说比如python.fn-execution.bundle.size=6,python.fn-execution.arrow.batch.size=2,那么就是我就会把6条数据,组织称3个pandas.series一次性发送到Python进程,一个pandas.series会调用一次的pandas
udf。所以这里就是调用3次。很明显了,你提高arrow.batch.size的好处是,一个是你组成的pandas.series的数量会更少,很显然每个pandas.series都是有meta信息放在数据头部,越少的pandas.series,那么你传送的数据少一点,通信开销会少一点,另一个好处是你调用udf的次数会减少。当然了你的python.fn-execution.arrow.batch.size是没法超过python.fn-execution.bundle.size,至于说不断增大python.fn-execution.bundle.size是不是就一定是好的也不一定,太大显然你是要buffer数据的,会增大延迟的,而且这时候python进程是空闲的,没有充分调度起来。
关于这两个参数的配置你可以参考文档[1]

>>> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
调节参数大小,你要根据你具体作业来调节。一般来说我们提供的默认值都是较优的,不需要调节。

>>> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
你这应该是24个。你可以通过env.get_parallelism()拿到

我说的可能有点多,希望对你有所帮助。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_config.html#python-options

Best,
Xingbo



洗你的头 <12...@qq.com> 于2020年10月26日周一 下午11:04写道:

> 感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式
>
>
> 然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点
>
>
>
> 1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
>
>
> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
>
>
> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
>
>
> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
>
> 最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> hxbks2ks@gmail.com&gt;;
> 发送时间:&nbsp;2020年10月26日(星期一) 晚上8:47
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: pyflink 如何正确设置高速度?(如何提速)
>
>
>
> Hi,
>
> 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
> 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你
>
> Best,
> Xingbo
>
> 洗你的头 <1264386006@qq.com&gt; 于2020年10月26日周一 下午4:32写道:
>
> &gt; 尊敬的开发者您好,
> &gt; 我的需求是这样的,
> &gt; 拥有数据:
> &gt;
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> &gt; 需要做什么?
> &gt;
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> &gt; 我现在的代码如下:
> &gt; import&amp;nbsp;pandas as&amp;nbsp;pd
> &gt; import&amp;nbsp;numpy as&amp;nbsp;np
> &gt; from&amp;nbsp;pyflink.datastream
> import&amp;nbsp;StreamExecutionEnvironment
> &gt; from&amp;nbsp;pyflink.table import&amp;nbsp;StreamTableEnvironment,
> DataTypes
> &gt; from&amp;nbsp;pyflink.table.descriptors import&amp;nbsp;Schema,
> OldCsv, FileSystem
> &gt; from&amp;nbsp;pyflink.table.udf import&amp;nbsp;udf
> &gt; import&amp;nbsp;os
> &gt; import&amp;nbsp;time
> &gt; # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
> &gt;
> &gt; env =&amp;nbsp;StreamExecutionEnvironment.get_execution_environment()
> &gt; env.set_parallelism(1)
> &gt; t_env =&amp;nbsp;StreamTableEnvironment.create(env)
> &gt;
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> &gt; '80m')
> &gt;
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> &gt; '100000')
> &gt; # 输出表创建
> &gt; if&amp;nbsp;os.path.exists('output'):
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; os.remove('output')
> &gt;
> &gt; t_env.connect(FileSystem().path('output')) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_format(OldCsv()
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field('id', DataTypes.BIGINT())) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_schema(Schema()
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field('id', DataTypes.BIGINT())) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .create_temporary_table('mySink')
> &gt; # 交叉口经纬度数据读取
> &gt; data =&amp;nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> &gt; coor_o =&amp;nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> &gt; data['O_Y'])))).T
> &gt; coor_d =&amp;nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> &gt; data['D_Y'])))).T
> &gt; coor =&amp;nbsp;coor_o.append(coor_d).drop_duplicates()
> &gt; coor.columns =&amp;nbsp;['lng', 'lat']
> &gt; coor =&amp;nbsp;coor.sort_index()
> &gt; coor =&amp;nbsp;coor.to_numpy()
> &gt; # udf编写与注册
> &gt;
> &gt;
> &gt;
> &gt; @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> DataTypes.ARRAY(DataTypes.FLOAT()),
> &gt; DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> &gt; def&amp;nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0],
> lat2=coor[:, 1]):
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; temp
> =&amp;nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&amp;nbsp;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance
> =&amp;nbsp;2*np.arctan2(np.sqrt(temp),
> &gt; np.sqrt(1-temp))
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance
> =&amp;nbsp;distance*3958.8*1609.344
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; buffer=100
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; if&amp;nbsp;(distance
> <=&amp;nbsp;buffer).sum() &amp;gt;&amp;nbsp;0:
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> return&amp;nbsp;distance.argmin()
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; else:
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> return&amp;nbsp;-1
> &gt; # 出行起点数据读取
> &gt;
> &gt; df =&amp;nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> &gt; use_data =&amp;nbsp;df[['pickup_longitude', 'pickup_latitude']]
> &gt; # 处理流程
> &gt; t_env.from_pandas(use_data) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> .select("distance_meters(pickup_longitude,
> &gt; pickup_latitude)") \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .insert_into('mySink')
> &gt; # 执行与计时
> &gt;
> &gt; start_time =&amp;nbsp;time.time()
> &gt; t_env.execute("tutorial_job")
> &gt; print(time.time() -&amp;nbsp;start_time)
> &gt; 我电脑的CPU为12核24线程
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> &gt; 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> &gt; 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> &gt; 期待您的回答,感谢!