You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Xuyang <xy...@163.com> on 2022/06/02 14:08:19 UTC
Re:Re: Re: [Internet]Re: Re: Some question with Flink state
Hi, 理论上来说这句话是不是有问题?
> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”
因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。
--
Best!
Xuyang
在 2022-05-25 13:38:52,"lxk7491@163.com" <lx...@163.com> 写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解 是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7491@163.com
>
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key group,然后固定的key group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储。
>
>> 2022年5月25日 上午10:45,lxk7491@163.com 写道:
>>
>> 图片好像又挂了 我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>>
>>
>>
>> 下面是我的代码及测试结果
>>
>>
>>
>> 一.使用int类型
>>
>>
>>
>> public class KeyByTest {
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>>
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>> env.setParallelism(10);
>>
>>
>>
>>
>>
>> DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>>
>>
>>
>> new data(1, "123", "分类页"),
>>
>>
>>
>> new data(2, "r-123", "搜索结果页"),
>>
>>
>>
>> new data(1, "r-123", "我的页"),
>>
>>
>>
>> new data(3, "r-4567", "搜索结果页")));
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
>>
>>
>>
>> .map(new RichMapFunction<data, String>() {
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public String map(data data) throws Exception {
>>
>>
>>
>> System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
>>
>>
>>
>> return data.toString();
>>
>>
>>
>> }
>>
>>
>>
>> });
>>
>>
>>
>>
>>
>>
>>
>> env.execute("test");
>>
>>
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>> class data{
>>
>>
>>
>> private int id;
>>
>>
>>
>> private String goods;
>>
>>
>>
>> private String pageName;
>>
>>
>>
>>
>>
>> public data(int id, String goods, String pageName) {
>>
>>
>>
>> this.id = id;
>>
>>
>>
>> this.goods = goods;
>>
>>
>>
>> this.pageName = pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> public data() {
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public int getId() {
>>
>>
>>
>> return id;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setId(int id) {
>>
>>
>>
>> this.id = id;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public String getGoods() {
>>
>>
>>
>> return goods;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setGoods(String goods) {
>>
>>
>>
>> this.goods = goods;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public String getPageName() {
>>
>>
>>
>> return pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setPageName(String pageName) {
>>
>>
>>
>> this.pageName = pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public String toString() {
>>
>>
>>
>> return "data{" +
>>
>>
>>
>> "id='" + id + '\'' +
>>
>>
>>
>> ", goods='" + goods + '\'' +
>>
>>
>>
>> ", pageName='" + pageName + '\'' +
>>
>>
>>
>> '}';
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> class MyKeySelector implements KeySelector<data,Integer>{
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public Integer getKey(data data) throws Exception {
>>
>>
>>
>> return data.getId();
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>> 控制台的输出如下:
>> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
>>
>>
>>
>> 可以看见数据根据id分组,分到了不同的subtask上。
>>
>>
>>
>>
>>
>>
>>
>> 二.使用String类型 代码如下:
>>
>>
>>
>> public class KeyByTest {
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>>
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>> env.setParallelism(10);
>>
>>
>>
>>
>>
>> DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
>>
>>
>>
>> new data("1", "123", "分类页"),
>>
>>
>>
>> new data("2", "r-123", "搜索结果页"),
>>
>>
>>
>> new data("2", "r-123", "我的页"),
>>
>>
>>
>> new data("3", "r-4567", "搜索结果页")));
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
>>
>>
>>
>> .map(new RichMapFunction<data, String>() {
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public String map(data data) throws Exception {
>>
>>
>>
>> System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
>>
>>
>>
>> return data.toString();
>>
>>
>>
>> }
>>
>>
>>
>> });
>>
>>
>>
>>
>>
>>
>>
>> env.execute("test");
>>
>>
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>> class data{
>>
>>
>>
>> private String id;
>>
>>
>>
>> private String goods;
>>
>>
>>
>> private String pageName;
>>
>>
>>
>>
>>
>> public data(String id, String goods, String pageName) {
>>
>>
>>
>> this.id = id;
>>
>>
>>
>> this.goods = goods;
>>
>>
>>
>> this.pageName = pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>> public data() {
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public String getId() {
>>
>>
>>
>> return id;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setId(String id) {
>>
>>
>>
>> this.id = id;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public String getGoods() {
>>
>>
>>
>> return goods;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setGoods(String goods) {
>>
>>
>>
>> this.goods = goods;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public String getPageName() {
>>
>>
>>
>> return pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> public void setPageName(String pageName) {
>>
>>
>>
>> this.pageName = pageName;
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public String toString() {
>>
>>
>>
>> return "data{" +
>>
>>
>>
>> "id='" + id + '\'' +
>>
>>
>>
>> ", goods='" + goods + '\'' +
>>
>>
>>
>> ", pageName='" + pageName + '\'' +
>>
>>
>>
>> '}';
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>> class MyKeySelector implements KeySelector<data,String>{
>>
>>
>>
>>
>>
>> @Override
>>
>>
>>
>> public String getKey(data data) throws Exception {
>>
>>
>>
>> return data.getId();
>>
>>
>>
>> }
>>
>>
>>
>> }
>>
>>
>>
>> 最终控制台输出如下:
>>
>>
>> https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png
>>
>>
>>
>>
>> 可以看见只分了两个组,我不清楚这是否是一个bug.
>>
>>
>>
>>
>>
>>
>>
>> lxk7491@163.com
>>
>>
>>
>>
>>
>>
>>
>> From: Xuyang
>>
>>
>>
>> Date: 2022-05-24 21:35
>>
>>
>>
>> To: user-zh
>>
>>
>>
>> Subject: Re:Re: Re: Some question with Flink state
>>
>>
>>
>> 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
>>
>>
>>
>> 在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
>>
>>
>>
>>> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>>
>>
>>
>>>
>>
>>
>>
>>>
>>
>>
>>
>>>
>>
>>
>>
>>> lxk7491@163.com
>>
>>
>>
>>>
>>
>>
>>
>>> From: Xuyang
>>
>>
>>
>>> Date: 2022-05-24 20:51
>>
>>
>>
>>> To: user-zh
>>
>>
>>
>>> Subject: Re:Re: Re: Some question with Flink state
>>
>>
>>
>>> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>>
>>
>>
>>> 在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>>
>>
>>
>>>>
>>
>>
>>
>>>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>
>>
>>
>>>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>
>>
>>>>
>>
>>
>>
>>>> 这样呢
>>
>>
>>
>>>>
>>
>>
>>
>>>>
>>
>>
>>
>>>> lxk7491@163.com
>>
>>
>>
>>>>
>>
>>
>>
>>>> From: Xuyang
>>
>>
>>
>>>> Date: 2022-05-24 20:17
>>
>>
>>
>>>> To: user-zh
>>
>>
>>
>>>> Subject: Re:Re: Re: Some question with Flink state
>>
>>
>>
>>>> Hi, 你的图还是挂了,可以使用图床工具试一下
>>
>>
>>
>>>>
>>
>>
>>
>>>>
>>
>>
>>
>>>>
>>
>>
>>
>>>> 在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
>>
>>
>>
>>>>
>>
>>
>>
>>>> 图片好像有点问题,重新上传一下
>>
>>
>>
>>>> lxk7491@163.com
>>
>>
>>
>>>> From: Hangxiang Yu
>>
>>
>>
>>>> Date: 2022-05-24 12:09
>>
>>
>>
>>>> To: user-zh
>>
>>
>>
>>>> Subject: Re: Re: Some question with Flink state
>>
>>
>>
>>>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>
>>
>>
>>>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>
>>
>>
>>>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>
>>
>>
>>>> On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>>
>>
>>
>>>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>>>>>
>>
>>
>>
>>>>>
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> lxk7491@163.com
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> From: Hangxiang Yu
>>
>>
>>
>>>>> Date: 2022-05-23 23:09
>>
>>
>>
>>>>> To: user-zh; lxk7491
>>
>>
>>
>>>>> Subject: Re: Some question with Flink state
>>
>>
>>
>>>>> Hello,
>>
>>
>>
>>>>> All states will not be shared in different parallelisms.
>>
>>
>>
>>>>> BTW, English questions could be sent to user@flink.apache.org.
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> Best,
>>
>>
>>
>>>>> Hangxiang.
>>
>>
>>
>>>>>
>>
>>
>>
>>>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>
>>
>>
>>>>>
>>
>>
>>
>>>>>>
>>
>>
>>
>>>>>> Hi everyone
>>
>>
>>
>>>>>> I was used Flink keyed-state in my Project.But I found some questions
>>
>>
>>
>>>>>> that make me confused.
>>
>>
>>
>>>>>> when I used value-state in multi parallelism the value is not I
>>
>>
>>
>>>>> wanted.
>>
>>
>>
>>>>>> So I guess that value-state is in every parallelism. every parallelism
>>
>>
>>
>>>>>> saved their only value which means the value is Thread-Level
>>
>>
>>
>>>>>> But when I used map-state,the value is correctly. I mean the map-state
>>
>>
>>
>>>>>> was shared by every parallelism.
>>
>>
>>
>>>>>> looking forward to your reply
>>
>>
>>
>>>>>>
>>
>>
>>
>>>>>>
>>
>>
>>
>>>>>> lxk7491@163.com
>>
>>
>>
>>>>>>
>>
>>
>>
>>>>>
>>
>>
>