You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 洗你的头 <12...@qq.com> on 2020/10/26 15:04:14 UTC
回复: pyflink 如何正确设置高速度?(如何提速)
感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式
然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点
1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。
------------------ 原始邮件 ------------------
发件人: "user-zh" <hxbks2ks@gmail.com>;
发送时间: 2020年10月26日(星期一) 晚上8:47
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: pyflink 如何正确设置高速度?(如何提速)
Hi,
1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你
Best,
Xingbo
洗你的头 <1264386006@qq.com> 于2020年10月26日周一 下午4:32写道:
> 尊敬的开发者您好,
> 我的需求是这样的,
> 拥有数据:
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> 需要做什么?
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> 我现在的代码如下:
> import&nbsp;pandas as&nbsp;pd
> import&nbsp;numpy as&nbsp;np
> from&nbsp;pyflink.datastream import&nbsp;StreamExecutionEnvironment
> from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment, DataTypes
> from&nbsp;pyflink.table.descriptors import&nbsp;Schema, OldCsv, FileSystem
> from&nbsp;pyflink.table.udf import&nbsp;udf
> import&nbsp;os
> import&nbsp;time
> # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
>
> env =&nbsp;StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env =&nbsp;StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> '100000')
> # 输出表创建
> if&nbsp;os.path.exists('output'):
> &nbsp;&nbsp;&nbsp; os.remove('output')
>
> t_env.connect(FileSystem().path('output')) \
> &nbsp;&nbsp;&nbsp; .with_format(OldCsv()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .with_schema(Schema()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink')
> # 交叉口经纬度数据读取
> data =&nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> data['O_Y'])))).T
> coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> data['D_Y'])))).T
> coor =&nbsp;coor_o.append(coor_d).drop_duplicates()
> coor.columns =&nbsp;['lng', 'lat']
> coor =&nbsp;coor.sort_index()
> coor =&nbsp;coor.to_numpy()
> # udf编写与注册
>
>
>
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ARRAY(DataTypes.FLOAT()),
> DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
> &nbsp;&nbsp;&nbsp; temp =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &nbsp;&nbsp;&nbsp; distance =&nbsp;2*np.arctan2(np.sqrt(temp),
> np.sqrt(1-temp))
> &nbsp;&nbsp;&nbsp; distance =&nbsp;distance*3958.8*1609.344
>
> &nbsp;&nbsp;&nbsp; buffer=100
> &nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin()
> &nbsp;&nbsp;&nbsp; else:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1
> # 出行起点数据读取
>
> df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
> # 处理流程
> t_env.from_pandas(use_data) \
> &nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude,
> pickup_latitude)") \
> &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
> # 执行与计时
>
> start_time =&nbsp;time.time()
> t_env.execute("tutorial_job")
> print(time.time() -&nbsp;start_time)
> 我电脑的CPU为12核24线程
>
>
>
>
>
>
>
>
>
>
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> 期待您的回答,感谢!
Re: pyflink 如何正确设置高速度?(如何提速)
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
>>>
1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
flink的table作业目前没法单独为每一个算子设置并发度,所以你设置并发度为8,就会输出8个文件。我觉得你这数据量不大,本质还是from_pandas的问题,你先把它换了,先用一个并发度玩就行。
>>> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
其实跑pandas
udf的模型是有一个java进程和对应一个Python进程,你的udf在Python进程跑着,数据从Java进程批量发送过去,一次发送多少数据是由python.fn-execution.bundle.size这个配置控制的,对于pandas
udf来说,因为需要把这个数据组织成pandas.series,所以还会有这个配置python.fn-execution.arrow.batch.size。举个例子就是说比如python.fn-execution.bundle.size=6,python.fn-execution.arrow.batch.size=2,那么就是我就会把6条数据,组织称3个pandas.series一次性发送到Python进程,一个pandas.series会调用一次的pandas
udf。所以这里就是调用3次。很明显了,你提高arrow.batch.size的好处是,一个是你组成的pandas.series的数量会更少,很显然每个pandas.series都是有meta信息放在数据头部,越少的pandas.series,那么你传送的数据少一点,通信开销会少一点,另一个好处是你调用udf的次数会减少。当然了你的python.fn-execution.arrow.batch.size是没法超过python.fn-execution.bundle.size,至于说不断增大python.fn-execution.bundle.size是不是就一定是好的也不一定,太大显然你是要buffer数据的,会增大延迟的,而且这时候python进程是空闲的,没有充分调度起来。
关于这两个参数的配置你可以参考文档[1]
>>> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
调节参数大小,你要根据你具体作业来调节。一般来说我们提供的默认值都是较优的,不需要调节。
>>> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
你这应该是24个。你可以通过env.get_parallelism()拿到
我说的可能有点多,希望对你有所帮助。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_config.html#python-options
Best,
Xingbo
洗你的头 <12...@qq.com> 于2020年10月26日周一 下午11:04写道:
> 感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式
>
>
> 然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点
>
>
>
> 1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
>
>
> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
>
>
> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
>
>
> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
>
> 最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。
>
>
> ------------------ 原始邮件 ------------------
> 发件人:
> "user-zh"
> <
> hxbks2ks@gmail.com>;
> 发送时间: 2020年10月26日(星期一) 晚上8:47
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: pyflink 如何正确设置高速度?(如何提速)
>
>
>
> Hi,
>
> 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
> 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你
>
> Best,
> Xingbo
>
> 洗你的头 <1264386006@qq.com> 于2020年10月26日周一 下午4:32写道:
>
> > 尊敬的开发者您好,
> > 我的需求是这样的,
> > 拥有数据:
> >
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> > 需要做什么?
> >
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> > 我现在的代码如下:
> > import&nbsp;pandas as&nbsp;pd
> > import&nbsp;numpy as&nbsp;np
> > from&nbsp;pyflink.datastream
> import&nbsp;StreamExecutionEnvironment
> > from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment,
> DataTypes
> > from&nbsp;pyflink.table.descriptors import&nbsp;Schema,
> OldCsv, FileSystem
> > from&nbsp;pyflink.table.udf import&nbsp;udf
> > import&nbsp;os
> > import&nbsp;time
> > # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
> >
> > env =&nbsp;StreamExecutionEnvironment.get_execution_environment()
> > env.set_parallelism(1)
> > t_env =&nbsp;StreamTableEnvironment.create(env)
> >
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> > '80m')
> >
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> > '100000')
> > # 输出表创建
> > if&nbsp;os.path.exists('output'):
> > &nbsp;&nbsp;&nbsp; os.remove('output')
> >
> > t_env.connect(FileSystem().path('output')) \
> > &nbsp;&nbsp;&nbsp; .with_format(OldCsv()
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > .field('id', DataTypes.BIGINT())) \
> > &nbsp;&nbsp;&nbsp; .with_schema(Schema()
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > .field('id', DataTypes.BIGINT())) \
> > &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink')
> > # 交叉口经纬度数据读取
> > data =&nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> > coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> > data['O_Y'])))).T
> > coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> > data['D_Y'])))).T
> > coor =&nbsp;coor_o.append(coor_d).drop_duplicates()
> > coor.columns =&nbsp;['lng', 'lat']
> > coor =&nbsp;coor.sort_index()
> > coor =&nbsp;coor.to_numpy()
> > # udf编写与注册
> >
> >
> >
> > @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> > &nbsp;&nbsp;&nbsp;&nbsp;
> DataTypes.ARRAY(DataTypes.FLOAT()),
> > DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> > def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0],
> lat2=coor[:, 1]):
> > &nbsp;&nbsp;&nbsp; temp
> =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp;
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> >
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> > &nbsp;&nbsp;&nbsp; distance
> =&nbsp;2*np.arctan2(np.sqrt(temp),
> > np.sqrt(1-temp))
> > &nbsp;&nbsp;&nbsp; distance
> =&nbsp;distance*3958.8*1609.344
> >
> > &nbsp;&nbsp;&nbsp; buffer=100
> > &nbsp;&nbsp;&nbsp; if&nbsp;(distance
> <=&nbsp;buffer).sum() &gt;&nbsp;0:
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return&nbsp;distance.argmin()
> > &nbsp;&nbsp;&nbsp; else:
> >
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return&nbsp;-1
> > # 出行起点数据读取
> >
> > df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> > use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
> > # 处理流程
> > t_env.from_pandas(use_data) \
> > &nbsp;&nbsp;&nbsp;&nbsp;
> .select("distance_meters(pickup_longitude,
> > pickup_latitude)") \
> > &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
> > # 执行与计时
> >
> > start_time =&nbsp;time.time()
> > t_env.execute("tutorial_job")
> > print(time.time() -&nbsp;start_time)
> > 我电脑的CPU为12核24线程
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> > 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> > 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> > 期待您的回答,感谢!