You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by ReignsDYL <19...@qq.com> on 2019/06/26 06:22:33 UTC
checkpoint stage size的问题
各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by ReignsDYL <19...@qq.com>.
我发现窗口的trigger只进行了fire,并没有进行purge,我不清楚是不是这个原因,或者还是有其他的原因。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by ReignsDYL <19...@qq.com>.
您好,感谢您的回复。
是这样,开始可能只是20几MB,但是只要有数据流入,它就一直变大,几个小时后,就达到了几百MB,并没有发现清理或者变小的现象。operator的每个subtask的stage
zise也是均匀的。
另外,我简单的写了个demo,就是从kafka读数据,然后保存到hbase,我发现那个checkpoint的stage
size虽然只有几十k,但是也在慢慢增长,每次比前一个均匀增加。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by Yun Tang <my...@live.com>.
你好
从附件的web监控看,其实你的整体checkpoint state其实很小(只有20几MB),所以对于这个问题其实有些过度关注了。
关于checkpoint state的变化,需要观察不同operator的情况,可以点开详细页看每个并发的情况。对比operator state和window所使用的keyed state的变化情况。我估计keyed state部分会有些许波动,主要是因为你使用的是RocksDB state backend,其实上传的是rocksDB的sst文件,当register timer时,window state会进行存储,当onTimer时,相关state会取出并更新或者删除,这里涉及到一个写放大和compaction的问题,rocksDB对某个key的删除不会直接对应物理上的存储的立刻减少。
祝好
唐云
________________________________
From: ReignsDYL <19...@qq.com>
Sent: Wednesday, June 26, 2019 17:38
To: user-zh@flink.apache.org
Subject: Re: checkpoint stage size的问题
这是web ui的监控
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by ReignsDYL <19...@qq.com>.
这是web ui的监控
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by ReignsDYL <19...@qq.com>.
<http://apache-flink.147419.n8.nabble.com/file/t26/checkpoint.png>
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by ReignsDYL <19...@qq.com>.
老师你好,首先感谢你在百忙之中回复我。
我这面观察到的现象是,当有数据流入时,每个checkpoint的stage
size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage
size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage
size就在原来基础上继续增长。
数据流:
SingleOutputStreamOperator<StudentAggResult> studentSubjectStream =
dataStream
.filter(new Question2SubjectFilter())
.keyBy(new TaskStudentSubjectKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new StudentSubjectScoreAgg());
studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(),
StudentAggResult.class));
聚合函数:
public abstract class BaseAgg<T, R extends IMergeable> implements
AggregateFunction<T, R, R> {
public abstract R create(T input);
public abstract void merge(R aggResult, T t);
@Override
public R createAccumulator() {
return null;
}
@Override
public R add(T t, R aggResult) {
if (aggResult == null) {
aggResult = create(t);
}
merge(aggResult, t);
return aggResult;
}
@Override
public R getResult(R aggResult) {
return aggResult;
}
@Override
public R merge(R aggResult, R acc1) {
if (acc1 == null) {
return aggResult;
}
if (aggResult == null) {
return acc1;
}
aggResult.merge(acc1);
return aggResult;
}
}
checkpoint配置:
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
状态存储通过rocksdb。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: checkpoint stage size的问题
Posted by CHENJIE <or...@126.com>.
你好,
如果有需要session窗口可能保持很长时间,数据量也很大,这种窗口会导致checkpoint stage size变的非常大
有没有一种机制可能让超过一定时间的状态失效并且丢弃掉?
在 2019-06-26 16:23:13,"Yun Tang" <my...@live.com> 写道:
>你好
>
>这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因:
>
> 1. 上游数据量增大。
> 2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。
> 3. window的类型决定了所需要存储的state size较大。
>
>可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。
>
>[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations
>
>祝好
>唐云
>________________________________
>From: ReignsDYL <19...@qq.com>
>Sent: Wednesday, June 26, 2019 14:22
>To: user-zh@flink.apache.org
>Subject: checkpoint stage size的问题
>
>各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
>size越来越大,请问是什么原因啊?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Re: checkpoint stage size的问题
Posted by Yun Tang <my...@live.com>.
你好
这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因:
1. 上游数据量增大。
2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。
3. window的类型决定了所需要存储的state size较大。
可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations
祝好
唐云
________________________________
From: ReignsDYL <19...@qq.com>
Sent: Wednesday, June 26, 2019 14:22
To: user-zh@flink.apache.org
Subject: checkpoint stage size的问题
各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?
--
Sent from: http://apache-flink.147419.n8.nabble.com/