You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by zhang yue <zh...@silvrr.com> on 2019/04/16 12:22:59 UTC

Flink 状态使用问题咨询

   你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。


class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient MapState<String, Long> stat_value;

    @Override
    public void flatMap(ObjectNode input, Collector<JSONObject> out) throws Exception {

        // access the state value

    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "stat_value",String.class, Long.class); // default value of the state, if nothing was set
        stat_value = getRuntimeContext().getMapState(descriptor);
    }
}


Re: Flink 状态使用问题咨询

Posted by "wenlong.lwl" <we...@gmail.com>.
访问db获取初始state,是要hack下的,要自己保存一个KeySelector,算下当前记录的Key是什么,这个逻辑不用改flink,写到你业务代码里就好了。

On Wed, 24 Apr 2019 at 21:23, Shi Quan <qu...@outlook.com> wrote:

> 主要是考虑是在从异常恢复场景下,业务state是否需要重新加载。如果不需要重新加载,就不要记录这么多时间用来判断了。
>
>
>
> Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows 10
>
>
>
> ________________________________
> From: zhang yue <zh...@silvrr.com>
> Sent: Wednesday, April 24, 2019 8:29:07 PM
> To: user-zh@flink.apache.org
> Subject: Re: Flink 状态使用问题咨询
>
> 嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足
>
> > 在 2019年4月24日,下午8:16,Shi Quan <qu...@outlook.com> 写道:
> >
> > initTime < openTime
>
>

RE: Flink 状态使用问题咨询

Posted by Shi Quan <qu...@outlook.com>.
主要是考虑是在从异常恢复场景下,业务state是否需要重新加载。如果不需要重新加载,就不要记录这么多时间用来判断了。



Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: zhang yue <zh...@silvrr.com>
Sent: Wednesday, April 24, 2019 8:29:07 PM
To: user-zh@flink.apache.org
Subject: Re: Flink 状态使用问题咨询

嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足

> 在 2019年4月24日,下午8:16,Shi Quan <qu...@outlook.com> 写道:
>
> initTime < openTime


Re: Flink 状态使用问题咨询

Posted by zhang yue <zh...@silvrr.com>.
嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足

> 在 2019年4月24日,下午8:16,Shi Quan <qu...@outlook.com> 写道:
> 
> initTime < openTime


RE: Flink 状态使用问题咨询

Posted by Shi Quan <qu...@outlook.com>.
有做过类似的事情,不用侵入flink的源码。记录几个关键信息:

  1.  Function open的时间,openTime;
  2.  数据初始化的时间,initTime,可以用State保存;
  3.  真正的业务State



当有数据过来时,iff (null == initTime) || (initTime < openTime) 进行初始数据加载动作。





Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: zhang yue <zh...@silvrr.com>
Sent: Wednesday, April 24, 2019 6:21:57 PM
To: user-zh@flink.apache.org
Subject: Re: Flink 状态使用问题咨询

这种情况我需要改flink源码吗,还是自己实现一个自定义的state类就好了,还有在这个state类中怎么能获取到key呢

> 在 2019年4月17日,上午11:24,wenlong.lwl <we...@gmail.com> 写道:
>
> 可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里
>
> On Tue, 16 Apr 2019 at 20:53, zhang yue <zh...@silvrr.com> wrote:
>
>> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
>> 那现在对于这种场景有什么替代方案吗
>>
>>> 在 2019年4月16日,下午8:33,Congxian Qiu <qc...@gmail.com> 写道:
>>>
>>> Hi
>>> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
>> 相关的事情,到时候就可以了
>>>
>>> Best, Congxian
>>> On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
>>>> 你好,我有一个keyed
>> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>>>>
>>>>
>>>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>>>>
>>>> /**
>>>> * The ValueState handle. The first field is the count, the second field
>> a running sum.
>>>> */
>>>> private transient MapState<String, Long> stat_value;
>>>>
>>>> @Override
>>>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
>> Exception {
>>>>
>>>> // access the state value
>>>>
>>>> }
>>>>
>>>> @Override
>>>> public void open(Configuration config) {
>>>> MapStateDescriptor<String, Long> descriptor =
>>>> new MapStateDescriptor<String, Long>(
>>>> "stat_value",String.class, Long.class); // default value of the state,
>> if nothing was set
>>>> stat_value = getRuntimeContext().getMapState(descriptor);
>>>> }
>>>> }
>>>>
>>
>>


Re: Flink 状态使用问题咨询

Posted by zhang yue <zh...@silvrr.com>.
这种情况我需要改flink源码吗,还是自己实现一个自定义的state类就好了,还有在这个state类中怎么能获取到key呢

> 在 2019年4月17日,上午11:24,wenlong.lwl <we...@gmail.com> 写道:
> 
> 可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里
> 
> On Tue, 16 Apr 2019 at 20:53, zhang yue <zh...@silvrr.com> wrote:
> 
>> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
>> 那现在对于这种场景有什么替代方案吗
>> 
>>> 在 2019年4月16日,下午8:33,Congxian Qiu <qc...@gmail.com> 写道:
>>> 
>>> Hi
>>> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
>> 相关的事情,到时候就可以了
>>> 
>>> Best, Congxian
>>> On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
>>>> 你好,我有一个keyed
>> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>>>> 
>>>> 
>>>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>>>> 
>>>> /**
>>>> * The ValueState handle. The first field is the count, the second field
>> a running sum.
>>>> */
>>>> private transient MapState<String, Long> stat_value;
>>>> 
>>>> @Override
>>>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
>> Exception {
>>>> 
>>>> // access the state value
>>>> 
>>>> }
>>>> 
>>>> @Override
>>>> public void open(Configuration config) {
>>>> MapStateDescriptor<String, Long> descriptor =
>>>> new MapStateDescriptor<String, Long>(
>>>> "stat_value",String.class, Long.class); // default value of the state,
>> if nothing was set
>>>> stat_value = getRuntimeContext().getMapState(descriptor);
>>>> }
>>>> }
>>>> 
>> 
>> 


Re: Flink 状态使用问题咨询

Posted by "wenlong.lwl" <we...@gmail.com>.
可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里

On Tue, 16 Apr 2019 at 20:53, zhang yue <zh...@silvrr.com> wrote:

> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
> 那现在对于这种场景有什么替代方案吗
>
> > 在 2019年4月16日,下午8:33,Congxian Qiu <qc...@gmail.com> 写道:
> >
> > Hi
> > 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
> 相关的事情,到时候就可以了
> >
> > Best, Congxian
> > On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
> >> 你好,我有一个keyed
> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
> >>
> >>
> >> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
> >>
> >> /**
> >> * The ValueState handle. The first field is the count, the second field
> a running sum.
> >> */
> >> private transient MapState<String, Long> stat_value;
> >>
> >> @Override
> >> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws
> Exception {
> >>
> >> // access the state value
> >>
> >> }
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> MapStateDescriptor<String, Long> descriptor =
> >> new MapStateDescriptor<String, Long>(
> >> "stat_value",String.class, Long.class); // default value of the state,
> if nothing was set
> >> stat_value = getRuntimeContext().getMapState(descriptor);
> >> }
> >> }
> >>
>
>

Re: Flink 状态使用问题咨询

Posted by zhang yue <zh...@silvrr.com>.
是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state 那现在对于这种场景有什么替代方案吗

> 在 2019年4月16日,下午8:33,Congxian Qiu <qc...@gmail.com> 写道:
> 
> Hi
> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer 相关的事情,到时候就可以了
> 
> Best, Congxian
> On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
>> 你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>> 
>> 
>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>> 
>> /**
>> * The ValueState handle. The first field is the count, the second field a running sum.
>> */
>> private transient MapState<String, Long> stat_value;
>> 
>> @Override
>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws Exception {
>> 
>> // access the state value
>> 
>> }
>> 
>> @Override
>> public void open(Configuration config) {
>> MapStateDescriptor<String, Long> descriptor =
>> new MapStateDescriptor<String, Long>(
>> "stat_value",String.class, Long.class); // default value of the state, if nothing was set
>> stat_value = getRuntimeContext().getMapState(descriptor);
>> }
>> }
>> 


Re: Flink 状态使用问题咨询

Posted by zhang yue <zh...@silvrr.com>.
Savepoint Reader/Writer 首先将外部数据写入savepoint,然后从savepoint启动吗

> 在 2019年4月16日,下午8:33,Congxian Qiu <qc...@gmail.com> 写道:
> 
> Hi
> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer 相关的事情,到时候就可以了
> 
> Best, Congxian
> On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
>> 你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>> 
>> 
>> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>> 
>> /**
>> * The ValueState handle. The first field is the count, the second field a running sum.
>> */
>> private transient MapState<String, Long> stat_value;
>> 
>> @Override
>> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws Exception {
>> 
>> // access the state value
>> 
>> }
>> 
>> @Override
>> public void open(Configuration config) {
>> MapStateDescriptor<String, Long> descriptor =
>> new MapStateDescriptor<String, Long>(
>> "stat_value",String.class, Long.class); // default value of the state, if nothing was set
>> stat_value = getRuntimeContext().getMapState(descriptor);
>> }
>> }
>> 


Re: Flink状态使用问题咨询

Posted by Congxian Qiu <qc...@gmail.com>.
Hi
如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer 相关的事情,到时候就可以了

Best, Congxian
On Apr 16, 2019, 20:27 +0800, zhang yue <zh...@silvrr.com>, wrote:
> 你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
>
>
> class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
>
> /**
> * The ValueState handle. The first field is the count, the second field a running sum.
> */
> private transient MapState<String, Long> stat_value;
>
> @Override
> public void flatMap(ObjectNode input, Collector<JSONObject> out) throws Exception {
>
> // access the state value
>
> }
>
> @Override
> public void open(Configuration config) {
> MapStateDescriptor<String, Long> descriptor =
> new MapStateDescriptor<String, Long>(
> "stat_value",String.class, Long.class); // default value of the state, if nothing was set
> stat_value = getRuntimeContext().getMapState(descriptor);
> }
> }
>