You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by ReignsDYL <19...@qq.com> on 2019/06/26 06:22:33 UTC

checkpoint stage size的问题

各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by ReignsDYL <19...@qq.com>.
我发现窗口的trigger只进行了fire,并没有进行purge,我不清楚是不是这个原因,或者还是有其他的原因。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by ReignsDYL <19...@qq.com>.
您好,感谢您的回复。

是这样,开始可能只是20几MB,但是只要有数据流入,它就一直变大,几个小时后,就达到了几百MB,并没有发现清理或者变小的现象。operator的每个subtask的stage
zise也是均匀的。
另外,我简单的写了个demo,就是从kafka读数据,然后保存到hbase,我发现那个checkpoint的stage
size虽然只有几十k,但是也在慢慢增长,每次比前一个均匀增加。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by Yun Tang <my...@live.com>.
你好

从附件的web监控看,其实你的整体checkpoint state其实很小(只有20几MB),所以对于这个问题其实有些过度关注了。
关于checkpoint state的变化,需要观察不同operator的情况,可以点开详细页看每个并发的情况。对比operator state和window所使用的keyed state的变化情况。我估计keyed state部分会有些许波动,主要是因为你使用的是RocksDB state backend,其实上传的是rocksDB的sst文件,当register timer时,window state会进行存储,当onTimer时,相关state会取出并更新或者删除,这里涉及到一个写放大和compaction的问题,rocksDB对某个key的删除不会直接对应物理上的存储的立刻减少。

祝好
唐云
________________________________
From: ReignsDYL <19...@qq.com>
Sent: Wednesday, June 26, 2019 17:38
To: user-zh@flink.apache.org
Subject: Re: checkpoint stage size的问题

这是web ui的监控
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png>



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by ReignsDYL <19...@qq.com>.
这是web ui的监控
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png> 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by ReignsDYL <19...@qq.com>.
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png> 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by ReignsDYL <19...@qq.com>.
老师你好,首先感谢你在百忙之中回复我。
我这面观察到的现象是,当有数据流入时,每个checkpoint的stage
size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage
size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage
size就在原来基础上继续增长。

数据流:
SingleOutputStreamOperator<StudentAggResult> studentSubjectStream =
dataStream
                .filter(new Question2SubjectFilter())
                .keyBy(new TaskStudentSubjectKeySelector())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new StudentSubjectScoreAgg());
       
studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(),
StudentAggResult.class));

聚合函数:
public abstract class BaseAgg<T, R extends IMergeable> implements
AggregateFunction<T, R, R> {

    public abstract R create(T input);
    public abstract void merge(R aggResult, T t);

    @Override
    public R createAccumulator() {
        return null;
    }

    @Override
    public R add(T t, R aggResult) {
        if (aggResult == null) {
            aggResult = create(t);
        }
        merge(aggResult, t);
        return aggResult;
    }

    @Override
    public R getResult(R aggResult) {
        return aggResult;
    }

    @Override
    public R merge(R aggResult, R acc1) {
        if (acc1 == null) {
            return aggResult;
        }
        if (aggResult == null) {
            return acc1;
        }
        aggResult.merge(acc1);
        return aggResult;
    }
}

checkpoint配置:
    env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);            
   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

状态存储通过rocksdb。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: checkpoint stage size的问题

Posted by CHENJIE <or...@126.com>.
你好,
        如果有需要session窗口可能保持很长时间,数据量也很大,这种窗口会导致checkpoint stage size变的非常大
        有没有一种机制可能让超过一定时间的状态失效并且丢弃掉?








在 2019-06-26 16:23:13,"Yun Tang" <my...@live.com> 写道:
>你好
>
>这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因:
>
>  1.  上游数据量增大。
>  2.  window设置时间较长,尚未触发,导致window内积攒的数据比较大。
>  3.  window的类型决定了所需要存储的state size较大。
>
>可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。
>
>[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations
>
>祝好
>唐云
>________________________________
>From: ReignsDYL <19...@qq.com>
>Sent: Wednesday, June 26, 2019 14:22
>To: user-zh@flink.apache.org
>Subject: checkpoint stage size的问题
>
>各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
>size越来越大,请问是什么原因啊?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint stage size的问题

Posted by Yun Tang <my...@live.com>.
你好

这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因:

  1.  上游数据量增大。
  2.  window设置时间较长,尚未触发,导致window内积攒的数据比较大。
  3.  window的类型决定了所需要存储的state size较大。

可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations

祝好
唐云
________________________________
From: ReignsDYL <19...@qq.com>
Sent: Wednesday, June 26, 2019 14:22
To: user-zh@flink.apache.org
Subject: checkpoint stage size的问题

各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/