You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "lxk7491@163.com" <lx...@163.com> on 2022/05/23 08:03:09 UTC

Some question with Flink state

Hi everyone
   I was used Flink keyed-state in my Project.But I found some questions that make me confused.
   when I used value-state in multi parallelism  the value is not I wanted.
   So I guess that value-state is in every parallelism. every parallelism saved their only value  which means the value is Thread-Level 
   But when I used map-state,the value is correctly. I mean the map-state was shared by every parallelism.
  looking forward to your reply 


lxk7491@163.com

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
以下是我的代码部分




这是最新的一版,根据测试的时候没有啥问题
但是之前使用value state的时候能从数据上看出不对


lxk7491@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL]
[URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL]
看下这个是否能看见图片


lxk7491@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
 
图片好像有点问题,重新上传一下
lxk7491@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re:Re: Re: [Internet]Re: Re: Some question with Flink state

Posted by Xuyang <xy...@163.com>.
Hi, 理论上来说这句话是不是有问题?


> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”


因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。




--

    Best!
    Xuyang





在 2022-05-25 13:38:52,"lxk7491@163.com" <lx...@163.com> 写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解   是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7491@163.com
> 
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key group,然后固定的key group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储。
> 
>> 2022年5月25日 上午10:45,lxk7491@163.com 写道:
>> 
>> 图片好像又挂了  我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>> 
>> 
>> 
>>    下面是我的代码及测试结果
>> 
>> 
>> 
>>     一.使用int类型
>> 
>> 
>> 
>>            public class KeyByTest {
>> 
>> 
>> 
>>     public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>>         env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>> 
>> 
>> 
>>                 new data(1, "123", "分类页"),
>> 
>> 
>> 
>>                 new data(2, "r-123", "搜索结果页"),
>> 
>> 
>> 
>>                 new data(1, "r-123", "我的页"),
>> 
>> 
>> 
>>                 new data(3, "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>>                 .map(new RichMapFunction<data, String>() {
>> 
>> 
>> 
>> 
>> 
>>                     @Override
>> 
>> 
>> 
>>                     public String map(data data) throws Exception {
>> 
>> 
>> 
>>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>>                         return data.toString();
>> 
>> 
>> 
>>                     }
>> 
>> 
>> 
>>                 });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>         env.execute("test");
>> 
>> 
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>>     private int id;
>> 
>> 
>> 
>>     private String goods;
>> 
>> 
>> 
>>     private String pageName;
>> 
>> 
>> 
>> 
>> 
>>     public data(int id, String goods, String pageName) {
>> 
>> 
>> 
>>         this.id = id;
>> 
>> 
>> 
>>         this.goods = goods;
>> 
>> 
>> 
>>         this.pageName = pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>     public data() {
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public int getId() {
>> 
>> 
>> 
>>         return id;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setId(int id) {
>> 
>> 
>> 
>>         this.id = id;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public String getGoods() {
>> 
>> 
>> 
>>         return goods;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setGoods(String goods) {
>> 
>> 
>> 
>>         this.goods = goods;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public String getPageName() {
>> 
>> 
>> 
>>         return pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setPageName(String pageName) {
>> 
>> 
>> 
>>         this.pageName = pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     @Override
>> 
>> 
>> 
>>     public String toString() {
>> 
>> 
>> 
>>         return "data{" +
>> 
>> 
>> 
>>                 "id='" + id + '\'' +
>> 
>> 
>> 
>>                 ", goods='" + goods + '\'' +
>> 
>> 
>> 
>>                 ", pageName='" + pageName + '\'' +
>> 
>> 
>> 
>>                 '}';
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> class MyKeySelector implements KeySelector<data,Integer>{
>> 
>> 
>> 
>> 
>> 
>>     @Override
>> 
>> 
>> 
>>     public Integer getKey(data data) throws Exception {
>> 
>> 
>> 
>>         return data.getId();
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 控制台的输出如下:
>> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
>> 
>> 
>> 
>> 可以看见数据根据id分组,分到了不同的subtask上。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 二.使用String类型  代码如下:
>> 
>> 
>> 
>> public class KeyByTest {
>> 
>> 
>> 
>>     public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>>         env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
>> 
>> 
>> 
>>                 new data("1", "123", "分类页"),
>> 
>> 
>> 
>>                 new data("2", "r-123", "搜索结果页"),
>> 
>> 
>> 
>>                 new data("2", "r-123", "我的页"),
>> 
>> 
>> 
>>                 new data("3", "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>>                 .map(new RichMapFunction<data, String>() {
>> 
>> 
>> 
>> 
>> 
>>                     @Override
>> 
>> 
>> 
>>                     public String map(data data) throws Exception {
>> 
>> 
>> 
>>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>>                         return data.toString();
>> 
>> 
>> 
>>                     }
>> 
>> 
>> 
>>                 });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>         env.execute("test");
>> 
>> 
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>>     private String id;
>> 
>> 
>> 
>>     private String goods;
>> 
>> 
>> 
>>     private String pageName;
>> 
>> 
>> 
>> 
>> 
>>     public data(String id, String goods, String pageName) {
>> 
>> 
>> 
>>         this.id = id;
>> 
>> 
>> 
>>         this.goods = goods;
>> 
>> 
>> 
>>         this.pageName = pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>     public data() {
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public String getId() {
>> 
>> 
>> 
>>         return id;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setId(String id) {
>> 
>> 
>> 
>>         this.id = id;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public String getGoods() {
>> 
>> 
>> 
>>         return goods;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setGoods(String goods) {
>> 
>> 
>> 
>>         this.goods = goods;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public String getPageName() {
>> 
>> 
>> 
>>         return pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     public void setPageName(String pageName) {
>> 
>> 
>> 
>>         this.pageName = pageName;
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> 
>> 
>>     @Override
>> 
>> 
>> 
>>     public String toString() {
>> 
>> 
>> 
>>         return "data{" +
>> 
>> 
>> 
>>                 "id='" + id + '\'' +
>> 
>> 
>> 
>>                 ", goods='" + goods + '\'' +
>> 
>> 
>> 
>>                 ", pageName='" + pageName + '\'' +
>> 
>> 
>> 
>>                 '}';
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> class MyKeySelector implements KeySelector<data,String>{
>> 
>> 
>> 
>> 
>> 
>>     @Override
>> 
>> 
>> 
>>     public String getKey(data data) throws Exception {
>> 
>> 
>> 
>>         return data.getId();
>> 
>> 
>> 
>>     }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 最终控制台输出如下:
>> 
>> 
>> https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png
>> 
>> 
>> 
>> 
>> 可以看见只分了两个组,我不清楚这是否是一个bug.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> lxk7491@163.com
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> From: Xuyang
>> 
>> 
>> 
>> Date: 2022-05-24 21:35
>> 
>> 
>> 
>> To: user-zh
>> 
>> 
>> 
>> Subject: Re:Re: Re: Some question with Flink state
>> 
>> 
>> 
>> 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
>> 
>> 
>> 
>> 在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
>> 
>> 
>> 
>>> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>> 
>> 
>> 
>>> 
>> 
>> 
>> 
>>> 
>> 
>> 
>> 
>>> 
>> 
>> 
>> 
>>> lxk7491@163.com
>> 
>> 
>> 
>>> 
>> 
>> 
>> 
>>> From: Xuyang
>> 
>> 
>> 
>>> Date: 2022-05-24 20:51
>> 
>> 
>> 
>>> To: user-zh
>> 
>> 
>> 
>>> Subject: Re:Re: Re: Some question with Flink state
>> 
>> 
>> 
>>> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>> 
>> 
>> 
>>> 在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>> 
>> 
>> 
>>>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 这样呢
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> lxk7491@163.com
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> From: Xuyang
>> 
>> 
>> 
>>>> Date: 2022-05-24 20:17
>> 
>> 
>> 
>>>> To: user-zh
>> 
>> 
>> 
>>>> Subject: Re:Re: Re: Some question with Flink state
>> 
>> 
>> 
>>>> Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
>> 
>> 
>> 
>>>> 
>> 
>> 
>> 
>>>> 图片好像有点问题,重新上传一下
>> 
>> 
>> 
>>>> lxk7491@163.com
>> 
>> 
>> 
>>>> From: Hangxiang Yu
>> 
>> 
>> 
>>>> Date: 2022-05-24 12:09
>> 
>> 
>> 
>>>> To: user-zh
>> 
>> 
>> 
>>>> Subject: Re: Re: Some question with Flink state
>> 
>> 
>> 
>>>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>> 
>> 
>> 
>>>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>> 
>> 
>> 
>>>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>> 
>> 
>> 
>>>> On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>> 
>> 
>> 
>>>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> lxk7491@163.com
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> From: Hangxiang Yu
>> 
>> 
>> 
>>>>> Date: 2022-05-23 23:09
>> 
>> 
>> 
>>>>> To: user-zh; lxk7491
>> 
>> 
>> 
>>>>> Subject: Re: Some question with Flink state
>> 
>> 
>> 
>>>>> Hello,
>> 
>> 
>> 
>>>>> All states will not be shared in different parallelisms.
>> 
>> 
>> 
>>>>> BTW, English questions could be sent to user@flink.apache.org.
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> Best,
>> 
>> 
>> 
>>>>> Hangxiang.
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>> 
>> 
>> 
>>>>> 
>> 
>> 
>> 
>>>>>> 
>> 
>> 
>> 
>>>>>> Hi everyone
>> 
>> 
>> 
>>>>>>     I was used Flink keyed-state in my Project.But I found some questions
>> 
>> 
>> 
>>>>>> that make me confused.
>> 
>> 
>> 
>>>>>>     when I used value-state in multi parallelism  the value is not I
>> 
>> 
>> 
>>>>> wanted.
>> 
>> 
>> 
>>>>>>     So I guess that value-state is in every parallelism. every parallelism
>> 
>> 
>> 
>>>>>> saved their only value  which means the value is Thread-Level
>> 
>> 
>> 
>>>>>>     But when I used map-state,the value is correctly. I mean the map-state
>> 
>> 
>> 
>>>>>> was shared by every parallelism.
>> 
>> 
>> 
>>>>>>    looking forward to your reply
>> 
>> 
>> 
>>>>>> 
>> 
>> 
>> 
>>>>>> 
>> 
>> 
>> 
>>>>>> lxk7491@163.com
>> 
>> 
>> 
>>>>>> 
>> 
>> 
>> 
>>>>> 
>> 
>> 
> 

Re: Re: [Internet]Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7491@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key group,然后固定的key group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7491@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>    下面是我的代码及测试结果
> 
> 
> 
>     一.使用int类型
> 
> 
> 
>            public class KeyByTest {
> 
> 
> 
>     public static void main(String[] args) throws Exception {
> 
> 
> 
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
>         env.setParallelism(10);
> 
> 
> 
> 
> 
>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
>                 new data(1, "123", "分类页"),
> 
> 
> 
>                 new data(2, "r-123", "搜索结果页"),
> 
> 
> 
>                 new data(1, "r-123", "我的页"),
> 
> 
> 
>                 new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
>                 .map(new RichMapFunction<data, String>() {
> 
> 
> 
> 
> 
>                     @Override
> 
> 
> 
>                     public String map(data data) throws Exception {
> 
> 
> 
>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
>                         return data.toString();
> 
> 
> 
>                     }
> 
> 
> 
>                 });
> 
> 
> 
> 
> 
> 
> 
>         env.execute("test");
> 
> 
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
>     private int id;
> 
> 
> 
>     private String goods;
> 
> 
> 
>     private String pageName;
> 
> 
> 
> 
> 
>     public data(int id, String goods, String pageName) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
> 
> 
>     public data() {
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public int getId() {
> 
> 
> 
>         return id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setId(int id) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getGoods() {
> 
> 
> 
>         return goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setGoods(String goods) {
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getPageName() {
> 
> 
> 
>         return pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setPageName(String pageName) {
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String toString() {
> 
> 
> 
>         return "data{" +
> 
> 
> 
>                 "id='" + id + '\'' +
> 
> 
> 
>                 ", goods='" + goods + '\'' +
> 
> 
> 
>                 ", pageName='" + pageName + '\'' +
> 
> 
> 
>                 '}';
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector<data,Integer>{
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public Integer getKey(data data) throws Exception {
> 
> 
> 
>         return data.getId();
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
>     public static void main(String[] args) throws Exception {
> 
> 
> 
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
>         env.setParallelism(10);
> 
> 
> 
> 
> 
>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
>                 new data("1", "123", "分类页"),
> 
> 
> 
>                 new data("2", "r-123", "搜索结果页"),
> 
> 
> 
>                 new data("2", "r-123", "我的页"),
> 
> 
> 
>                 new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
>                 .map(new RichMapFunction<data, String>() {
> 
> 
> 
> 
> 
>                     @Override
> 
> 
> 
>                     public String map(data data) throws Exception {
> 
> 
> 
>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
>                         return data.toString();
> 
> 
> 
>                     }
> 
> 
> 
>                 });
> 
> 
> 
> 
> 
> 
> 
>         env.execute("test");
> 
> 
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
>     private String id;
> 
> 
> 
>     private String goods;
> 
> 
> 
>     private String pageName;
> 
> 
> 
> 
> 
>     public data(String id, String goods, String pageName) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
> 
> 
>     public data() {
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getId() {
> 
> 
> 
>         return id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setId(String id) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getGoods() {
> 
> 
> 
>         return goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setGoods(String goods) {
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getPageName() {
> 
> 
> 
>         return pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setPageName(String pageName) {
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String toString() {
> 
> 
> 
>         return "data{" +
> 
> 
> 
>                 "id='" + id + '\'' +
> 
> 
> 
>                 ", goods='" + goods + '\'' +
> 
> 
> 
>                 ", pageName='" + pageName + '\'' +
> 
> 
> 
>                 '}';
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector<data,String>{
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String getKey(data data) throws Exception {
> 
> 
> 
>         return data.getId();
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 最终控制台输出如下:
> 
> 
> https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png
> 
> 
> 
> 
> 可以看见只分了两个组,我不清楚这是否是一个bug.
> 
> 
> 
> 
> 
> 
> 
> lxk7491@163.com
> 
> 
> 
> 
> 
> 
> 
> From: Xuyang
> 
> 
> 
> Date: 2022-05-24 21:35
> 
> 
> 
> To: user-zh
> 
> 
> 
> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
> 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
> 
> 
> 
> 在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> lxk7491@163.com
> 
> 
> 
>> 
> 
> 
> 
>> From: Xuyang
> 
> 
> 
>> Date: 2022-05-24 20:51
> 
> 
> 
>> To: user-zh
> 
> 
> 
>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
> 
> 
> 
>> 在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
> 
> 
> 
>>> 
> 
> 
> 
>>> 这样呢
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> lxk7491@163.com
> 
> 
> 
>>> 
> 
> 
> 
>>> From: Xuyang
> 
> 
> 
>>> Date: 2022-05-24 20:17
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>>> Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> 图片好像有点问题,重新上传一下
> 
> 
> 
>>> lxk7491@163.com
> 
> 
> 
>>> From: Hangxiang Yu
> 
> 
> 
>>> Date: 2022-05-24 12:09
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re: Re: Some question with Flink state
> 
> 
> 
>>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
> 
> 
> 
>>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
> 
> 
> 
>>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
> 
> 
> 
>>> On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
> 
> 
> 
>>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> lxk7491@163.com
> 
> 
> 
>>>> 
> 
> 
> 
>>>> From: Hangxiang Yu
> 
> 
> 
>>>> Date: 2022-05-23 23:09
> 
> 
> 
>>>> To: user-zh; lxk7491
> 
> 
> 
>>>> Subject: Re: Some question with Flink state
> 
> 
> 
>>>> Hello,
> 
> 
> 
>>>> All states will not be shared in different parallelisms.
> 
> 
> 
>>>> BTW, English questions could be sent to user@flink.apache.org.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> Best,
> 
> 
> 
>>>> Hangxiang.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
> 
> 
> 
>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> Hi everyone
> 
> 
> 
>>>>>     I was used Flink keyed-state in my Project.But I found some questions
> 
> 
> 
>>>>> that make me confused.
> 
> 
> 
>>>>>     when I used value-state in multi parallelism  the value is not I
> 
> 
> 
>>>> wanted.
> 
> 
> 
>>>>>     So I guess that value-state is in every parallelism. every parallelism
> 
> 
> 
>>>>> saved their only value  which means the value is Thread-Level
> 
> 
> 
>>>>>     But when I used map-state,the value is correctly. I mean the map-state
> 
> 
> 
>>>>> was shared by every parallelism.
> 
> 
> 
>>>>>    looking forward to your reply
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> lxk7491@163.com
> 
> 
> 
>>>>> 
> 
> 
> 
>>>> 
> 
> 
 

Re: [Internet]Re: Re: Some question with Flink state

Posted by "jurluo(罗凯)" <ju...@tencent.com>.
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key group,然后固定的key group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储。

> 2022年5月25日 上午10:45,lxk7491@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>    下面是我的代码及测试结果
> 
> 
> 
>     一.使用int类型
> 
> 
> 
>            public class KeyByTest {
> 
> 
> 
>     public static void main(String[] args) throws Exception {
> 
> 
> 
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
>         env.setParallelism(10);
> 
> 
> 
> 
> 
>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
>                 new data(1, "123", "分类页"),
> 
> 
> 
>                 new data(2, "r-123", "搜索结果页"),
> 
> 
> 
>                 new data(1, "r-123", "我的页"),
> 
> 
> 
>                 new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
>                 .map(new RichMapFunction<data, String>() {
> 
> 
> 
> 
> 
>                     @Override
> 
> 
> 
>                     public String map(data data) throws Exception {
> 
> 
> 
>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
>                         return data.toString();
> 
> 
> 
>                     }
> 
> 
> 
>                 });
> 
> 
> 
> 
> 
> 
> 
>         env.execute("test");
> 
> 
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
>     private int id;
> 
> 
> 
>     private String goods;
> 
> 
> 
>     private String pageName;
> 
> 
> 
> 
> 
>     public data(int id, String goods, String pageName) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
> 
> 
>     public data() {
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public int getId() {
> 
> 
> 
>         return id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setId(int id) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getGoods() {
> 
> 
> 
>         return goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setGoods(String goods) {
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getPageName() {
> 
> 
> 
>         return pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setPageName(String pageName) {
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String toString() {
> 
> 
> 
>         return "data{" +
> 
> 
> 
>                 "id='" + id + '\'' +
> 
> 
> 
>                 ", goods='" + goods + '\'' +
> 
> 
> 
>                 ", pageName='" + pageName + '\'' +
> 
> 
> 
>                 '}';
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector<data,Integer>{
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public Integer getKey(data data) throws Exception {
> 
> 
> 
>         return data.getId();
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
>     public static void main(String[] args) throws Exception {
> 
> 
> 
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
>         env.setParallelism(10);
> 
> 
> 
> 
> 
>         DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
>                 new data("1", "123", "分类页"),
> 
> 
> 
>                 new data("2", "r-123", "搜索结果页"),
> 
> 
> 
>                 new data("2", "r-123", "我的页"),
> 
> 
> 
>                 new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>         SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
>                 .map(new RichMapFunction<data, String>() {
> 
> 
> 
> 
> 
>                     @Override
> 
> 
> 
>                     public String map(data data) throws Exception {
> 
> 
> 
>                         System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
>                         return data.toString();
> 
> 
> 
>                     }
> 
> 
> 
>                 });
> 
> 
> 
> 
> 
> 
> 
>         env.execute("test");
> 
> 
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
>     private String id;
> 
> 
> 
>     private String goods;
> 
> 
> 
>     private String pageName;
> 
> 
> 
> 
> 
>     public data(String id, String goods, String pageName) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
> 
> 
>     public data() {
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getId() {
> 
> 
> 
>         return id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setId(String id) {
> 
> 
> 
>         this.id = id;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getGoods() {
> 
> 
> 
>         return goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setGoods(String goods) {
> 
> 
> 
>         this.goods = goods;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public String getPageName() {
> 
> 
> 
>         return pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     public void setPageName(String pageName) {
> 
> 
> 
>         this.pageName = pageName;
> 
> 
> 
>     }
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String toString() {
> 
> 
> 
>         return "data{" +
> 
> 
> 
>                 "id='" + id + '\'' +
> 
> 
> 
>                 ", goods='" + goods + '\'' +
> 
> 
> 
>                 ", pageName='" + pageName + '\'' +
> 
> 
> 
>                 '}';
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector<data,String>{
> 
> 
> 
> 
> 
>     @Override
> 
> 
> 
>     public String getKey(data data) throws Exception {
> 
> 
> 
>         return data.getId();
> 
> 
> 
>     }
> 
> 
> 
> }
> 
> 
> 
> 最终控制台输出如下:
> 
> 
> https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png
> 
> 
> 
> 
> 可以看见只分了两个组,我不清楚这是否是一个bug.
> 
> 
> 
> 
> 
> 
> 
> lxk7491@163.com
> 
> 
> 
> 
> 
> 
> 
> From: Xuyang
> 
> 
> 
> Date: 2022-05-24 21:35
> 
> 
> 
> To: user-zh
> 
> 
> 
> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
> 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
> 
> 
> 
> 在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> 
> 
> 
> 
>> lxk7491@163.com
> 
> 
> 
>> 
> 
> 
> 
>> From: Xuyang
> 
> 
> 
>> Date: 2022-05-24 20:51
> 
> 
> 
>> To: user-zh
> 
> 
> 
>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
> 
> 
> 
>> 在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
> 
> 
> 
>>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
> 
> 
> 
>>> 
> 
> 
> 
>>> 这样呢
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> lxk7491@163.com
> 
> 
> 
>>> 
> 
> 
> 
>>> From: Xuyang
> 
> 
> 
>>> Date: 2022-05-24 20:17
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re:Re: Re: Some question with Flink state
> 
> 
> 
>>> Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 
> 
> 
> 
>>> 在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
> 
> 
> 
>>> 
> 
> 
> 
>>> 图片好像有点问题,重新上传一下
> 
> 
> 
>>> lxk7491@163.com
> 
> 
> 
>>> From: Hangxiang Yu
> 
> 
> 
>>> Date: 2022-05-24 12:09
> 
> 
> 
>>> To: user-zh
> 
> 
> 
>>> Subject: Re: Re: Some question with Flink state
> 
> 
> 
>>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
> 
> 
> 
>>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
> 
> 
> 
>>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
> 
> 
> 
>>> On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
> 
> 
> 
>>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> 
> 
> 
> 
>>>> lxk7491@163.com
> 
> 
> 
>>>> 
> 
> 
> 
>>>> From: Hangxiang Yu
> 
> 
> 
>>>> Date: 2022-05-23 23:09
> 
> 
> 
>>>> To: user-zh; lxk7491
> 
> 
> 
>>>> Subject: Re: Some question with Flink state
> 
> 
> 
>>>> Hello,
> 
> 
> 
>>>> All states will not be shared in different parallelisms.
> 
> 
> 
>>>> BTW, English questions could be sent to user@flink.apache.org.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> Best,
> 
> 
> 
>>>> Hangxiang.
> 
> 
> 
>>>> 
> 
> 
> 
>>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
> 
> 
> 
>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> Hi everyone
> 
> 
> 
>>>>>     I was used Flink keyed-state in my Project.But I found some questions
> 
> 
> 
>>>>> that make me confused.
> 
> 
> 
>>>>>     when I used value-state in multi parallelism  the value is not I
> 
> 
> 
>>>> wanted.
> 
> 
> 
>>>>>     So I guess that value-state is in every parallelism. every parallelism
> 
> 
> 
>>>>> saved their only value  which means the value is Thread-Level
> 
> 
> 
>>>>>     But when I used map-state,the value is correctly. I mean the map-state
> 
> 
> 
>>>>> was shared by every parallelism.
> 
> 
> 
>>>>>    looking forward to your reply
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> 
> 
> 
> 
>>>>> lxk7491@163.com
> 
> 
> 
>>>>> 
> 
> 
> 
>>>> 
> 
> 


Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
图片好像又挂了  我重发下
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准



   下面是我的代码及测试结果



    一.使用int类型



           public class KeyByTest {



    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        env.setParallelism(10);





        DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),



                new data(1, "123", "分类页"),



                new data(2, "r-123", "搜索结果页"),



                new data(1, "r-123", "我的页"),



                new data(3, "r-4567", "搜索结果页")));











        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())



                .map(new RichMapFunction<data, String>() {





                    @Override



                    public String map(data data) throws Exception {



                        System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );



                        return data.toString();



                    }



                });







        env.execute("test");





    }



}



class data{



    private int id;



    private String goods;



    private String pageName;





    public data(int id, String goods, String pageName) {



        this.id = id;



        this.goods = goods;



        this.pageName = pageName;



    }







    public data() {



    }





    public int getId() {



        return id;



    }





    public void setId(int id) {



        this.id = id;



    }





    public String getGoods() {



        return goods;



    }





    public void setGoods(String goods) {



        this.goods = goods;



    }





    public String getPageName() {



        return pageName;



    }





    public void setPageName(String pageName) {



        this.pageName = pageName;



    }





    @Override



    public String toString() {



        return "data{" +



                "id='" + id + '\'' +



                ", goods='" + goods + '\'' +



                ", pageName='" + pageName + '\'' +



                '}';



    }



}





class MyKeySelector implements KeySelector<data,Integer>{





    @Override



    public Integer getKey(data data) throws Exception {



        return data.getId();



    }



}



控制台的输出如下:
https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png



可以看见数据根据id分组,分到了不同的subtask上。







二.使用String类型  代码如下:



public class KeyByTest {



    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        env.setParallelism(10);





        DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),



                new data("1", "123", "分类页"),



                new data("2", "r-123", "搜索结果页"),



                new data("2", "r-123", "我的页"),



                new data("3", "r-4567", "搜索结果页")));











        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())



                .map(new RichMapFunction<data, String>() {





                    @Override



                    public String map(data data) throws Exception {



                        System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );



                        return data.toString();



                    }



                });







        env.execute("test");





    }



}



class data{



    private String id;



    private String goods;



    private String pageName;





    public data(String id, String goods, String pageName) {



        this.id = id;



        this.goods = goods;



        this.pageName = pageName;



    }







    public data() {



    }





    public String getId() {



        return id;



    }





    public void setId(String id) {



        this.id = id;



    }





    public String getGoods() {



        return goods;



    }





    public void setGoods(String goods) {



        this.goods = goods;



    }





    public String getPageName() {



        return pageName;



    }





    public void setPageName(String pageName) {



        this.pageName = pageName;



    }





    @Override



    public String toString() {



        return "data{" +



                "id='" + id + '\'' +



                ", goods='" + goods + '\'' +



                ", pageName='" + pageName + '\'' +



                '}';



    }



}





class MyKeySelector implements KeySelector<data,String>{





    @Override



    public String getKey(data data) throws Exception {



        return data.getId();



    }



}



最终控制台输出如下:


https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png




可以看见只分了两个组,我不清楚这是否是一个bug.







lxk7491@163.com



 



From: Xuyang



Date: 2022-05-24 21:35



To: user-zh



Subject: Re:Re: Re: Some question with Flink state



我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。



在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:



>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



>



>



>



>lxk7491@163.com



> 



>From: Xuyang



>Date: 2022-05-24 20:51



>To: user-zh



>Subject: Re:Re: Re: Some question with Flink state



>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下



>在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:



>>



>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png



>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png



>>



>>这样呢



>>



>>



>>lxk7491@163.com



>> 



>>From: Xuyang



>>Date: 2022-05-24 20:17



>>To: user-zh



>>Subject: Re:Re: Re: Some question with Flink state



>>Hi, 你的图还是挂了,可以使用图床工具试一下



>> 



>> 



>> 



>>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:



>> 



>>图片好像有点问题,重新上传一下



>>lxk7491@163.com



>>From: Hangxiang Yu



>>Date: 2022-05-24 12:09



>>To: user-zh



>>Subject: Re: Re: Some question with Flink state



>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key



>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);



>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;



>>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:



>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。



>>>



>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。



>>>



>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。



>>>



>>>



>>>



>>> lxk7491@163.com



>>>



>>> From: Hangxiang Yu



>>> Date: 2022-05-23 23:09



>>> To: user-zh; lxk7491



>>> Subject: Re: Some question with Flink state



>>> Hello,



>>> All states will not be shared in different parallelisms.



>>> BTW, English questions could be sent to user@flink.apache.org.



>>>



>>> Best,



>>> Hangxiang.



>>>



>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:



>>>



>>> >



>>> > Hi everyone



>>> >    I was used Flink keyed-state in my Project.But I found some questions



>>> > that make me confused.



>>> >    when I used value-state in multi parallelism  the value is not I



>>> wanted.



>>> >    So I guess that value-state is in every parallelism. every parallelism



>>> > saved their only value  which means the value is Thread-Level



>>> >    But when I used map-state,the value is correctly. I mean the map-state



>>> > was shared by every parallelism.



>>> >   looking forward to your reply



>>> >



>>> >



>>> > lxk7491@163.com



>>> >



>>>



Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
    一.使用int类型
           public class KeyByTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);

        DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
                new data(1, "123", "分类页"),
                new data(2, "r-123", "搜索结果页"),
                new data(1, "r-123", "我的页"),
                new data(3, "r-4567", "搜索结果页")));




        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
                .map(new RichMapFunction<data, String>() {

                    @Override
                    public String map(data data) throws Exception {
                        System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
                        return data.toString();
                    }
                });


        env.execute("test");

    }
}
class data{
    private int id;
    private String goods;
    private String pageName;

    public data(int id, String goods, String pageName) {
        this.id = id;
        this.goods = goods;
        this.pageName = pageName;
    }


    public data() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoods(String goods) {
        this.goods = goods;
    }

    public String getPageName() {
        return pageName;
    }

    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

    @Override
    public String toString() {
        return "data{" +
                "id='" + id + '\'' +
                ", goods='" + goods + '\'' +
                ", pageName='" + pageName + '\'' +
                '}';
    }
}

class MyKeySelector implements KeySelector<data,Integer>{

    @Override
    public Integer getKey(data data) throws Exception {
        return data.getId();
    }
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);

        DataStreamSource<data> dataDataStreamSource = env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
                new data("1", "123", "分类页"),
                new data("2", "r-123", "搜索结果页"),
                new data("2", "r-123", "我的页"),
                new data("3", "r-4567", "搜索结果页")));




        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new MyKeySelector())
                .map(new RichMapFunction<data, String>() {

                    @Override
                    public String map(data data) throws Exception {
                        System.out.println(data.toString() + "的subtask为:" + getRuntimeContext().getIndexOfThisSubtask() );
                        return data.toString();
                    }
                });


        env.execute("test");

    }
}
class data{
    private String id;
    private String goods;
    private String pageName;

    public data(String id, String goods, String pageName) {
        this.id = id;
        this.goods = goods;
        this.pageName = pageName;
    }


    public data() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoods(String goods) {
        this.goods = goods;
    }

    public String getPageName() {
        return pageName;
    }

    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

    @Override
    public String toString() {
        return "data{" +
                "id='" + id + '\'' +
                ", goods='" + goods + '\'' +
                ", pageName='" + pageName + '\'' +
                '}';
    }
}

class MyKeySelector implements KeySelector<data,String>{

    @Override
    public String getKey(data data) throws Exception {
        return data.getId();
    }
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7491@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7491@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7491@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7491@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7491@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to user@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>>
>>> >
>>> > Hi everyone
>>> >    I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >    when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >    So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >    But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7491@163.com
>>> >
>>>

Re:Re: Re: Some question with Flink state

Posted by Xuyang <xy...@163.com>.
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7491@163.com" <lx...@163.com> 写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7491@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7491@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7491@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7491@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to user@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>>
>>> >
>>> > Hi everyone
>>> >    I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >    when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >    So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >    But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7491@163.com
>>> >
>>>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



lxk7491@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7491@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
> 
>图片好像有点问题,重新上传一下
>lxk7491@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7491@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to user@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>
>> >
>> > Hi everyone
>> >    I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >    when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >    So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >    But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7491@163.com
>> >
>>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
好的,我会尝试去弄一下。


lxk7491@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7491@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
> 
>图片好像有点问题,重新上传一下
>lxk7491@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7491@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to user@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>
>> >
>> > Hi everyone
>> >    I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >    when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >    So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >    But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7491@163.com
>> >
>>

Re:Re: Re: Some question with Flink state

Posted by Xuyang <xy...@163.com>.
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7491@163.com" <lx...@163.com> 写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7491@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
> 
>图片好像有点问题,重新上传一下
>lxk7491@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7491@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to user@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>>
>> >
>> > Hi everyone
>> >    I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >    when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >    So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >    But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7491@163.com
>> >
>>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png

这样呢


lxk7491@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:
 
图片好像有点问题,重新上传一下
lxk7491@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re:Re: Re: Some question with Flink state

Posted by Xuyang <xy...@163.com>.
Hi, 你的图还是挂了,可以使用图床工具试一下



在 2022-05-24 13:50:34,"lxk7491@163.com" <lx...@163.com> 写道:

图片好像有点问题,重新上传一下
lxk7491@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
图片好像有点问题,重新上传一下


lxk7491@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re: Re: Some question with Flink state

Posted by Hangxiang Yu <ma...@gmail.com>.
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;

On Tue, May 24, 2022 at 9:59 AM lxk7491@163.com <lx...@163.com> wrote:

> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7491@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to user@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
>
> >
> > Hi everyone
> >    I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >    when I used value-state in multi parallelism  the value is not I
> wanted.
> >    So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >    But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7491@163.com
> >
>

Re: Re: Some question with Flink state

Posted by "lxk7491@163.com" <lx...@163.com>.
好的,我看这里面邮件都是英文,所以用英文问了个问题。
我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。



lxk7491@163.com
 
From: Hangxiang Yu
Date: 2022-05-23 23:09
To: user-zh; lxk7491
Subject: Re: Some question with Flink state
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to user@flink.apache.org.
 
Best,
Hangxiang.
 
On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:
 
>
> Hi everyone
>    I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>    when I used value-state in multi parallelism  the value is not I wanted.
>    So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>    But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7491@163.com
>

Re: Some question with Flink state

Posted by Hangxiang Yu <ma...@gmail.com>.
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to user@flink.apache.org.

Best,
Hangxiang.

On Mon, May 23, 2022 at 4:03 PM lxk7491@163.com <lx...@163.com> wrote:

>
> Hi everyone
>    I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>    when I used value-state in multi parallelism  the value is not I wanted.
>    So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>    But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7491@163.com
>