You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Peihui He <pe...@gmail.com> on 2020/10/30 11:23:31 UTC
flink 1.11.2 keyby 更换partition
hi,all
请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
128, parallesism)
用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
Best Regards.
Re: flink 1.11.2 keyby 更换partition
Posted by Peihui He <pe...@gmail.com>.
Hi,
我理解是不能的。
假设现在 asset_id 有如下数据
0
1
2
3
4
5
6
7
8
9
假设我通过自定义KeySelector,设定key
为Integer,调用KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
10, 10) 这个方法后,并不是我希望的0在0号partition,1在1号partition.结果如下(partition,count):
(1,2)
(3,1)
(4,3)
(5,3)
(8,1)
因为
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
MathUtils.murmurHash 会对integer 做二次计算的。
Best Wishes.
Congxian Qiu <qc...@gmail.com> 于2020年11月2日周一 下午3:19写道:
> Hi
> 自定义的 KeySelector[1] 能否满足呢?
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#keyed-datastream
>
> Best,
> Congxian
>
>
> Peihui He <pe...@gmail.com> 于2020年11月2日周一 下午2:56写道:
>
> > Hi,
> >
> > 不好意思,我这边误导。
> > 现在的情况是这样的
> >
> > 用这个方法测试
> > KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
> > KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ),
> > parallelism)
> > 发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了
> > ),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的
> > KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism
> > ) 这种方式好多了。
> >
> >
> > 请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。
> >
> >
> > Best Wishes.
> >
> >
> >
> > Congxian Qiu <qc...@gmail.com> 于2020年11月2日周一 下午1:52写道:
> >
> > > Hi
> > > 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby
> 会更均匀的话,是不是直接把计算
> > > md5 的逻辑改成计算 hashcode 的逻辑就行了
> > > Best,
> > > Congxian
> > >
> > >
> > > Peihui He <pe...@gmail.com> 于2020年11月2日周一 上午10:01写道:
> > >
> > > > hi,
> > > >
> > > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
> > > >
> > > > Best Wishes.
> > > >
> > > > Zhang Yuxiao <yx...@outlook.com> 于2020年10月31日周六 上午9:38写道:
> > > >
> > > > > 你好,
> > > > >
> > > > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > > > > ________________________________
> > > > > 发件人: Peihui He <pe...@gmail.com>
> > > > > 发送时间: 2020年10月30日 下午 07:23
> > > > > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > > > > 主题: flink 1.11.2 keyby 更换partition
> > > > >
> > > > > hi,all
> > > > >
> > > > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> > > > >
> > > > >
> > > >
> > >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > > > > 128, parallesism)
> > > > >
> > > > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> > > > >
> > > > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> > > > >
> > > > >
> > > > > Best Regards.
> > > > >
> > > >
> > >
> >
>
Re: flink 1.11.2 keyby 更换partition
Posted by Congxian Qiu <qc...@gmail.com>.
Hi
自定义的 KeySelector[1] 能否满足呢?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#keyed-datastream
Best,
Congxian
Peihui He <pe...@gmail.com> 于2020年11月2日周一 下午2:56写道:
> Hi,
>
> 不好意思,我这边误导。
> 现在的情况是这样的
>
> 用这个方法测试
> KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
> KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ),
> parallelism)
> 发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了
> ),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的
> KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism
> ) 这种方式好多了。
>
>
> 请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。
>
>
> Best Wishes.
>
>
>
> Congxian Qiu <qc...@gmail.com> 于2020年11月2日周一 下午1:52写道:
>
> > Hi
> > 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
> > md5 的逻辑改成计算 hashcode 的逻辑就行了
> > Best,
> > Congxian
> >
> >
> > Peihui He <pe...@gmail.com> 于2020年11月2日周一 上午10:01写道:
> >
> > > hi,
> > >
> > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
> > >
> > > Best Wishes.
> > >
> > > Zhang Yuxiao <yx...@outlook.com> 于2020年10月31日周六 上午9:38写道:
> > >
> > > > 你好,
> > > >
> > > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > > > ________________________________
> > > > 发件人: Peihui He <pe...@gmail.com>
> > > > 发送时间: 2020年10月30日 下午 07:23
> > > > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > > > 主题: flink 1.11.2 keyby 更换partition
> > > >
> > > > hi,all
> > > >
> > > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> > > >
> > > >
> > >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > > > 128, parallesism)
> > > >
> > > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> > > >
> > > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> > > >
> > > >
> > > > Best Regards.
> > > >
> > >
> >
>
Re: flink 1.11.2 keyby 更换partition
Posted by Peihui He <pe...@gmail.com>.
Hi,
不好意思,我这边误导。
现在的情况是这样的
用这个方法测试
KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id,
KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ),
parallelism)
发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了
),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的
KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism
) 这种方式好多了。
请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。
Best Wishes.
Congxian Qiu <qc...@gmail.com> 于2020年11月2日周一 下午1:52写道:
> Hi
> 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
> md5 的逻辑改成计算 hashcode 的逻辑就行了
> Best,
> Congxian
>
>
> Peihui He <pe...@gmail.com> 于2020年11月2日周一 上午10:01写道:
>
> > hi,
> >
> > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
> >
> > Best Wishes.
> >
> > Zhang Yuxiao <yx...@outlook.com> 于2020年10月31日周六 上午9:38写道:
> >
> > > 你好,
> > >
> > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > > ________________________________
> > > 发件人: Peihui He <pe...@gmail.com>
> > > 发送时间: 2020年10月30日 下午 07:23
> > > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > > 主题: flink 1.11.2 keyby 更换partition
> > >
> > > hi,all
> > >
> > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> > >
> > >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > > 128, parallesism)
> > >
> > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> > >
> > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> > >
> > >
> > > Best Regards.
> > >
> >
>
Re: flink 1.11.2 keyby 更换partition
Posted by Congxian Qiu <qc...@gmail.com>.
Hi
不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算
md5 的逻辑改成计算 hashcode 的逻辑就行了
Best,
Congxian
Peihui He <pe...@gmail.com> 于2020年11月2日周一 上午10:01写道:
> hi,
>
> 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
>
> Best Wishes.
>
> Zhang Yuxiao <yx...@outlook.com> 于2020年10月31日周六 上午9:38写道:
>
> > 你好,
> >
> > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> > ________________________________
> > 发件人: Peihui He <pe...@gmail.com>
> > 发送时间: 2020年10月30日 下午 07:23
> > 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> > 主题: flink 1.11.2 keyby 更换partition
> >
> > hi,all
> >
> > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
> >
> >
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> > 128, parallesism)
> >
> > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
> >
> > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
> >
> >
> > Best Regards.
> >
>
Re: flink 1.11.2 keyby 更换partition
Posted by Peihui He <pe...@gmail.com>.
hi,
已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。
Best Wishes.
Zhang Yuxiao <yx...@outlook.com> 于2020年10月31日周六 上午9:38写道:
> 你好,
>
> 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
> ________________________________
> 发件人: Peihui He <pe...@gmail.com>
> 发送时间: 2020年10月30日 下午 07:23
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: flink 1.11.2 keyby 更换partition
>
> hi,all
>
> 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
>
> KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
> 128, parallesism)
>
> 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
>
> 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
>
>
> Best Regards.
>
回复: flink 1.11.2 keyby 更换partition
Posted by Zhang Yuxiao <yx...@outlook.com>.
你好,
你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求?
________________________________
发件人: Peihui He <pe...@gmail.com>
发送时间: 2020年10月30日 下午 07:23
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: flink 1.11.2 keyby 更换partition
hi,all
请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。
KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id),
128, parallesism)
用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。
相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。
Best Regards.