You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yj h <hy...@gmail.com> on 2022/03/27 09:00:45 UTC
计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
配置相关:
配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
目前采取的排查步骤:
1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
2.采用了evictor
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
.evictor(TimeEvictor.of(Time.seconds(0),true))
3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
mat分析dump 内存占用较大的是
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
代码:
//其中ShoppingRecords UserClickModel都是普通的bean对象
inputStream
.filter(data -> "pv".equals(data.getBehavior()))
.keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
Long>>() {
@Override
public Tuple2<LocalDate, Long> getKey(ShoppingRecords
value) throws Exception {
Instant instant =
Instant.ofEpochMilli(value.getTs());
return Tuple2.of(
LocalDateTime.ofInstant(instant,
ZoneId.of("Asia/Shanghai")).toLocalDate(),
value.getItemId()
);
}
})
.window(TumblingEventTimeWindows.of(Time.days(1)))
// .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
// .evictor(TimeEvictor.of(Time.seconds(0),true))
.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
.process(new ProcessWindowFunctionBitMap())
// .addSink(new RedisSink<>(conf, new UvRedisSink()));
.addSink(new PrintSinkFunction());
public static class ProcessWindowFunctionBitMap
extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
Tuple2<LocalDate, Long>, TimeWindow> {
private transient ValueState<Integer> pvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> pvStateDescriptor = new
ValueStateDescriptor<>("pv", Integer.class);
ValueStateDescriptor<Roaring64NavigableMap>
bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
, TypeInformation.of(new
TypeHint<Roaring64NavigableMap>() {
}));
// BlockingQueue
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
pvStateDescriptor.enableTimeToLive(stateTtlConfig);
bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
pvState = this.getRuntimeContext().getState(pvStateDescriptor);
bitMapState =
this.getRuntimeContext().getState(bitMapStateDescriptor);
}
@Override
public void process(Tuple2<LocalDate, Long> key,
ProcessWindowFunction<ShoppingRecords,
UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
Iterable<ShoppingRecords> elements,
Collector<UserClickModel> out) throws Exception {
// 当前状态的pv uv
Integer pv = pvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if (bitMap == null) {
bitMap = new Roaring64NavigableMap();
pv = 0;
}
Iterator<ShoppingRecords> iterator = elements.iterator();
while (iterator.hasNext()) {
pv = pv + 1;
long uid = iterator.next().getUser_id();
//如果userId可以转成long
bitMap.add(uid);
}
// 更新pv
pvState.update(pv);
UserClickModel UserClickModel = new UserClickModel();
UserClickModel.setDate(key.f0.toString());
UserClickModel.setProduct(key.f1);
UserClickModel.setPv(pv);
UserClickModel.setUv(bitMap.getIntCardinality());
out.collect(UserClickModel);
}
}
Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
Posted by yj h <hy...@gmail.com>.
是3min中内的state太大导致的,增大TM task heap 和切换RocksdbStatebackend都可以解决
yue ma <ma...@gmail.com> 于2022年3月28日周一 11:07写道:
> 看上去主要是 state 在heap中太大导致的, 建议可以切换为 RocksdbStatebackend
>
> yj h <hy...@gmail.com> 于2022年3月27日周日 17:07写道:
>
> > 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
> >
> > 配置相关:
> > 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
> >
> > 目前采取的排查步骤:
> > 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> 很快就oom掉了
> > 2.采用了evictor
> > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> > .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> >
> > 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
> >
> > mat分析dump 内存占用较大的是
> > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
> >
> > 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
> >
> > 代码:
> > //其中ShoppingRecords UserClickModel都是普通的bean对象
> > inputStream
> > .filter(data -> "pv".equals(data.getBehavior()))
> > .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> > Long>>() {
> > @Override
> > public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> > value) throws Exception {
> > Instant instant =
> > Instant.ofEpochMilli(value.getTs());
> > return Tuple2.of(
> > LocalDateTime.ofInstant(instant,
> > ZoneId.of("Asia/Shanghai")).toLocalDate(),
> > value.getItemId()
> > );
> > }
> > })
> > .window(TumblingEventTimeWindows.of(Time.days(1)))
> > //
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> > // .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> > .process(new ProcessWindowFunctionBitMap())
> > // .addSink(new RedisSink<>(conf, new UvRedisSink()));
> > .addSink(new PrintSinkFunction());
> >
> > public static class ProcessWindowFunctionBitMap
> > extends ProcessWindowFunction<ShoppingRecords,
> UserClickModel,
> > Tuple2<LocalDate, Long>, TimeWindow> {
> >
> > private transient ValueState<Integer> pvState;
> > private transient ValueState<Roaring64NavigableMap> bitMapState;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > ValueStateDescriptor<Integer> pvStateDescriptor = new
> > ValueStateDescriptor<>("pv", Integer.class);
> > ValueStateDescriptor<Roaring64NavigableMap>
> > bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
> > , TypeInformation.of(new
> > TypeHint<Roaring64NavigableMap>() {
> > }));
> > // BlockingQueue
> > // 过期状态清除
> > StateTtlConfig stateTtlConfig = StateTtlConfig
> >
> > .newBuilder(org.apache.flink.api.common.time.Time.days(1))
> >
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > .build();
> > // 开启ttl
> > pvStateDescriptor.enableTimeToLive(stateTtlConfig);
> > bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
> >
> > pvState =
> this.getRuntimeContext().getState(pvStateDescriptor);
> > bitMapState =
> > this.getRuntimeContext().getState(bitMapStateDescriptor);
> >
> > }
> >
> > @Override
> > public void process(Tuple2<LocalDate, Long> key,
> > ProcessWindowFunction<ShoppingRecords,
> > UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
> > Iterable<ShoppingRecords> elements,
> > Collector<UserClickModel> out) throws Exception {
> > // 当前状态的pv uv
> > Integer pv = pvState.value();
> > Roaring64NavigableMap bitMap = bitMapState.value();
> > if (bitMap == null) {
> > bitMap = new Roaring64NavigableMap();
> > pv = 0;
> > }
> >
> > Iterator<ShoppingRecords> iterator = elements.iterator();
> > while (iterator.hasNext()) {
> > pv = pv + 1;
> > long uid = iterator.next().getUser_id();
> > //如果userId可以转成long
> > bitMap.add(uid);
> > }
> >
> > // 更新pv
> > pvState.update(pv);
> >
> > UserClickModel UserClickModel = new UserClickModel();
> >
> > UserClickModel.setDate(key.f0.toString());
> > UserClickModel.setProduct(key.f1);
> > UserClickModel.setPv(pv);
> > UserClickModel.setUv(bitMap.getIntCardinality());
> >
> > out.collect(UserClickModel);
> > }
> > }
> >
>
Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
Posted by yue ma <ma...@gmail.com>.
看上去主要是 state 在heap中太大导致的, 建议可以切换为 RocksdbStatebackend
yj h <hy...@gmail.com> 于2022年3月27日周日 17:07写道:
> 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
>
> 配置相关:
> 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
>
> 目前采取的排查步骤:
> 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
> 2.采用了evictor
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
>
> 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
>
> mat分析dump 内存占用较大的是
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>
> 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
>
> 代码:
> //其中ShoppingRecords UserClickModel都是普通的bean对象
> inputStream
> .filter(data -> "pv".equals(data.getBehavior()))
> .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> Long>>() {
> @Override
> public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> value) throws Exception {
> Instant instant =
> Instant.ofEpochMilli(value.getTs());
> return Tuple2.of(
> LocalDateTime.ofInstant(instant,
> ZoneId.of("Asia/Shanghai")).toLocalDate(),
> value.getItemId()
> );
> }
> })
> .window(TumblingEventTimeWindows.of(Time.days(1)))
> // .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> // .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> .process(new ProcessWindowFunctionBitMap())
> // .addSink(new RedisSink<>(conf, new UvRedisSink()));
> .addSink(new PrintSinkFunction());
>
> public static class ProcessWindowFunctionBitMap
> extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
> Tuple2<LocalDate, Long>, TimeWindow> {
>
> private transient ValueState<Integer> pvState;
> private transient ValueState<Roaring64NavigableMap> bitMapState;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ValueStateDescriptor<Integer> pvStateDescriptor = new
> ValueStateDescriptor<>("pv", Integer.class);
> ValueStateDescriptor<Roaring64NavigableMap>
> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
> , TypeInformation.of(new
> TypeHint<Roaring64NavigableMap>() {
> }));
> // BlockingQueue
> // 过期状态清除
> StateTtlConfig stateTtlConfig = StateTtlConfig
>
> .newBuilder(org.apache.flink.api.common.time.Time.days(1))
>
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> // 开启ttl
> pvStateDescriptor.enableTimeToLive(stateTtlConfig);
> bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
>
> pvState = this.getRuntimeContext().getState(pvStateDescriptor);
> bitMapState =
> this.getRuntimeContext().getState(bitMapStateDescriptor);
>
> }
>
> @Override
> public void process(Tuple2<LocalDate, Long> key,
> ProcessWindowFunction<ShoppingRecords,
> UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
> Iterable<ShoppingRecords> elements,
> Collector<UserClickModel> out) throws Exception {
> // 当前状态的pv uv
> Integer pv = pvState.value();
> Roaring64NavigableMap bitMap = bitMapState.value();
> if (bitMap == null) {
> bitMap = new Roaring64NavigableMap();
> pv = 0;
> }
>
> Iterator<ShoppingRecords> iterator = elements.iterator();
> while (iterator.hasNext()) {
> pv = pv + 1;
> long uid = iterator.next().getUser_id();
> //如果userId可以转成long
> bitMap.add(uid);
> }
>
> // 更新pv
> pvState.update(pv);
>
> UserClickModel UserClickModel = new UserClickModel();
>
> UserClickModel.setDate(key.f0.toString());
> UserClickModel.setProduct(key.f1);
> UserClickModel.setPv(pv);
> UserClickModel.setUv(bitMap.getIntCardinality());
>
> out.collect(UserClickModel);
> }
> }
>
Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
Posted by r pp <pr...@gmail.com>.
purgingTrigger 是触发后清除windowstate,3min的触发,windowstate 会保存3min 的数据,这个看gc也可以。
yj h <hy...@gmail.com> 于 2022年3月28日周一 上午12:08写道:
> hi,thank you, 是你说的TM内存不够的问题
> 我9点左右分下了下gc的日志,Gcviewr看显示fullgc后最大剩下300M左右,我看帖子提到的经验就把TM
> heap提升到了1GB,测试过,在总共500M的数据下没有发生OOM 了
>
> 另外咨询一下
> 我要怎么测试3min来了多少数据?
> 调整TM应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了
>
> best regards!
>
> r pp <pr...@gmail.com> 于2022年3月27日周日 23:46写道:
>
> > hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> > 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
> >
>
Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
Posted by yj h <hy...@gmail.com>.
hi,thank you, 是你说的TM内存不够的问题
我9点左右分下了下gc的日志,Gcviewr看显示fullgc后最大剩下300M左右,我看帖子提到的经验就把TM
heap提升到了1GB,测试过,在总共500M的数据下没有发生OOM 了
另外咨询一下
我要怎么测试3min来了多少数据?
调整TM应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了
best regards!
r pp <pr...@gmail.com> 于2022年3月27日周日 23:46写道:
> hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
>
Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题
Posted by r pp <pr...@gmail.com>.
hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
内总数据量是多少?内存大概多大?可以试着调整TM 的内存量