You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yj h <hy...@gmail.com> on 2022/03/27 09:00:45 UTC

计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次

配置相关:
配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置

目前采取的排查步骤:
1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
2.采用了evictor
               .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
                .evictor(TimeEvictor.of(Time.seconds(0),true))
3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))

2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大

mat分析dump 内存占用较大的是
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer

使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助

代码:
//其中ShoppingRecords UserClickModel都是普通的bean对象
inputStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
Long>>() {
                    @Override
                    public Tuple2<LocalDate, Long> getKey(ShoppingRecords
value) throws Exception {
                        Instant instant =
Instant.ofEpochMilli(value.getTs());
                        return Tuple2.of(
                                LocalDateTime.ofInstant(instant,
ZoneId.of("Asia/Shanghai")).toLocalDate(),
                                value.getItemId()
                        );
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.days(1)))
//                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
//                .evictor(TimeEvictor.of(Time.seconds(0),true))

.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
                .process(new ProcessWindowFunctionBitMap())
//                .addSink(new RedisSink<>(conf, new UvRedisSink()));
                .addSink(new PrintSinkFunction());

 public static class ProcessWindowFunctionBitMap
            extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
Tuple2<LocalDate, Long>, TimeWindow> {

        private transient ValueState<Integer> pvState;
        private transient ValueState<Roaring64NavigableMap> bitMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueStateDescriptor<Integer> pvStateDescriptor = new
ValueStateDescriptor<>("pv", Integer.class);
            ValueStateDescriptor<Roaring64NavigableMap>
bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
                    , TypeInformation.of(new
TypeHint<Roaring64NavigableMap>() {
            }));
//            BlockingQueue
            // 过期状态清除
            StateTtlConfig stateTtlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.days(1))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            // 开启ttl
            pvStateDescriptor.enableTimeToLive(stateTtlConfig);
            bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);

            pvState = this.getRuntimeContext().getState(pvStateDescriptor);
            bitMapState =
this.getRuntimeContext().getState(bitMapStateDescriptor);

        }

        @Override
        public void process(Tuple2<LocalDate, Long> key,
                            ProcessWindowFunction<ShoppingRecords,
UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
                            Iterable<ShoppingRecords> elements,
Collector<UserClickModel> out) throws Exception {
            // 当前状态的pv uv
            Integer pv = pvState.value();
            Roaring64NavigableMap bitMap = bitMapState.value();
            if (bitMap == null) {
                bitMap = new Roaring64NavigableMap();
                pv = 0;
            }

            Iterator<ShoppingRecords> iterator = elements.iterator();
            while (iterator.hasNext()) {
                pv = pv + 1;
                long uid = iterator.next().getUser_id();
                //如果userId可以转成long
                bitMap.add(uid);
            }

            // 更新pv
            pvState.update(pv);

            UserClickModel UserClickModel = new UserClickModel();

            UserClickModel.setDate(key.f0.toString());
            UserClickModel.setProduct(key.f1);
            UserClickModel.setPv(pv);
            UserClickModel.setUv(bitMap.getIntCardinality());

            out.collect(UserClickModel);
        }
    }

Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

Posted by yj h <hy...@gmail.com>.
是3min中内的state太大导致的,增大TM task heap 和切换RocksdbStatebackend都可以解决

yue ma <ma...@gmail.com> 于2022年3月28日周一 11:07写道:

> 看上去主要是 state 在heap中太大导致的, 建议可以切换为  RocksdbStatebackend
>
> yj h <hy...@gmail.com> 于2022年3月27日周日 17:07写道:
>
> > 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
> >
> > 配置相关:
> > 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
> >
> > 目前采取的排查步骤:
> > 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> 很快就oom掉了
> > 2.采用了evictor
> >                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> >                 .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> >
> > 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
> >
> > mat分析dump 内存占用较大的是
> > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
> >
> > 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
> >
> > 代码:
> > //其中ShoppingRecords UserClickModel都是普通的bean对象
> > inputStream
> >                 .filter(data -> "pv".equals(data.getBehavior()))
> >                 .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> > Long>>() {
> >                     @Override
> >                     public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> > value) throws Exception {
> >                         Instant instant =
> > Instant.ofEpochMilli(value.getTs());
> >                         return Tuple2.of(
> >                                 LocalDateTime.ofInstant(instant,
> > ZoneId.of("Asia/Shanghai")).toLocalDate(),
> >                                 value.getItemId()
> >                         );
> >                     }
> >                 })
> >                 .window(TumblingEventTimeWindows.of(Time.days(1)))
> > //
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> > //                .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> >                 .process(new ProcessWindowFunctionBitMap())
> > //                .addSink(new RedisSink<>(conf, new UvRedisSink()));
> >                 .addSink(new PrintSinkFunction());
> >
> >  public static class ProcessWindowFunctionBitMap
> >             extends ProcessWindowFunction<ShoppingRecords,
> UserClickModel,
> > Tuple2<LocalDate, Long>, TimeWindow> {
> >
> >         private transient ValueState<Integer> pvState;
> >         private transient ValueState<Roaring64NavigableMap> bitMapState;
> >
> >         @Override
> >         public void open(Configuration parameters) throws Exception {
> >             super.open(parameters);
> >             ValueStateDescriptor<Integer> pvStateDescriptor = new
> > ValueStateDescriptor<>("pv", Integer.class);
> >             ValueStateDescriptor<Roaring64NavigableMap>
> > bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
> >                     , TypeInformation.of(new
> > TypeHint<Roaring64NavigableMap>() {
> >             }));
> > //            BlockingQueue
> >             // 过期状态清除
> >             StateTtlConfig stateTtlConfig = StateTtlConfig
> >
> > .newBuilder(org.apache.flink.api.common.time.Time.days(1))
> >
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >                     .build();
> >             // 开启ttl
> >             pvStateDescriptor.enableTimeToLive(stateTtlConfig);
> >             bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
> >
> >             pvState =
> this.getRuntimeContext().getState(pvStateDescriptor);
> >             bitMapState =
> > this.getRuntimeContext().getState(bitMapStateDescriptor);
> >
> >         }
> >
> >         @Override
> >         public void process(Tuple2<LocalDate, Long> key,
> >                             ProcessWindowFunction<ShoppingRecords,
> > UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
> >                             Iterable<ShoppingRecords> elements,
> > Collector<UserClickModel> out) throws Exception {
> >             // 当前状态的pv uv
> >             Integer pv = pvState.value();
> >             Roaring64NavigableMap bitMap = bitMapState.value();
> >             if (bitMap == null) {
> >                 bitMap = new Roaring64NavigableMap();
> >                 pv = 0;
> >             }
> >
> >             Iterator<ShoppingRecords> iterator = elements.iterator();
> >             while (iterator.hasNext()) {
> >                 pv = pv + 1;
> >                 long uid = iterator.next().getUser_id();
> >                 //如果userId可以转成long
> >                 bitMap.add(uid);
> >             }
> >
> >             // 更新pv
> >             pvState.update(pv);
> >
> >             UserClickModel UserClickModel = new UserClickModel();
> >
> >             UserClickModel.setDate(key.f0.toString());
> >             UserClickModel.setProduct(key.f1);
> >             UserClickModel.setPv(pv);
> >             UserClickModel.setUv(bitMap.getIntCardinality());
> >
> >             out.collect(UserClickModel);
> >         }
> >     }
> >
>

Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

Posted by yue ma <ma...@gmail.com>.
看上去主要是 state 在heap中太大导致的, 建议可以切换为  RocksdbStatebackend

yj h <hy...@gmail.com> 于2022年3月27日周日 17:07写道:

> 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
>
> 配置相关:
> 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
>
> 目前采取的排查步骤:
> 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
> 2.采用了evictor
>                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
>                 .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
>
> 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
>
> mat分析dump 内存占用较大的是
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>
> 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
>
> 代码:
> //其中ShoppingRecords UserClickModel都是普通的bean对象
> inputStream
>                 .filter(data -> "pv".equals(data.getBehavior()))
>                 .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> Long>>() {
>                     @Override
>                     public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> value) throws Exception {
>                         Instant instant =
> Instant.ofEpochMilli(value.getTs());
>                         return Tuple2.of(
>                                 LocalDateTime.ofInstant(instant,
> ZoneId.of("Asia/Shanghai")).toLocalDate(),
>                                 value.getItemId()
>                         );
>                     }
>                 })
>                 .window(TumblingEventTimeWindows.of(Time.days(1)))
> //                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> //                .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
>                 .process(new ProcessWindowFunctionBitMap())
> //                .addSink(new RedisSink<>(conf, new UvRedisSink()));
>                 .addSink(new PrintSinkFunction());
>
>  public static class ProcessWindowFunctionBitMap
>             extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
> Tuple2<LocalDate, Long>, TimeWindow> {
>
>         private transient ValueState<Integer> pvState;
>         private transient ValueState<Roaring64NavigableMap> bitMapState;
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>             ValueStateDescriptor<Integer> pvStateDescriptor = new
> ValueStateDescriptor<>("pv", Integer.class);
>             ValueStateDescriptor<Roaring64NavigableMap>
> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
>                     , TypeInformation.of(new
> TypeHint<Roaring64NavigableMap>() {
>             }));
> //            BlockingQueue
>             // 过期状态清除
>             StateTtlConfig stateTtlConfig = StateTtlConfig
>
> .newBuilder(org.apache.flink.api.common.time.Time.days(1))
>
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                     .build();
>             // 开启ttl
>             pvStateDescriptor.enableTimeToLive(stateTtlConfig);
>             bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
>
>             pvState = this.getRuntimeContext().getState(pvStateDescriptor);
>             bitMapState =
> this.getRuntimeContext().getState(bitMapStateDescriptor);
>
>         }
>
>         @Override
>         public void process(Tuple2<LocalDate, Long> key,
>                             ProcessWindowFunction<ShoppingRecords,
> UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
>                             Iterable<ShoppingRecords> elements,
> Collector<UserClickModel> out) throws Exception {
>             // 当前状态的pv uv
>             Integer pv = pvState.value();
>             Roaring64NavigableMap bitMap = bitMapState.value();
>             if (bitMap == null) {
>                 bitMap = new Roaring64NavigableMap();
>                 pv = 0;
>             }
>
>             Iterator<ShoppingRecords> iterator = elements.iterator();
>             while (iterator.hasNext()) {
>                 pv = pv + 1;
>                 long uid = iterator.next().getUser_id();
>                 //如果userId可以转成long
>                 bitMap.add(uid);
>             }
>
>             // 更新pv
>             pvState.update(pv);
>
>             UserClickModel UserClickModel = new UserClickModel();
>
>             UserClickModel.setDate(key.f0.toString());
>             UserClickModel.setProduct(key.f1);
>             UserClickModel.setPv(pv);
>             UserClickModel.setUv(bitMap.getIntCardinality());
>
>             out.collect(UserClickModel);
>         }
>     }
>

Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

Posted by r pp <pr...@gmail.com>.
purgingTrigger 是触发后清除windowstate,3min的触发,windowstate 会保存3min  的数据,这个看gc也可以。

yj h <hy...@gmail.com> 于 2022年3月28日周一 上午12:08写道:

> hi,thank you, 是你说的TM内存不够的问题
>  我9点左右分下了下gc的日志,Gcviewr看显示fullgc后最大剩下300M左右,我看帖子提到的经验就把TM
> heap提升到了1GB,测试过,在总共500M的数据下没有发生OOM 了
>
> 另外咨询一下
> 我要怎么测试3min来了多少数据?
> 调整TM应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了
>
> best regards!
>
> r pp <pr...@gmail.com> 于2022年3月27日周日 23:46写道:
>
> > hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> > 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
> >
>

Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

Posted by yj h <hy...@gmail.com>.
hi,thank you, 是你说的TM内存不够的问题
 我9点左右分下了下gc的日志,Gcviewr看显示fullgc后最大剩下300M左右,我看帖子提到的经验就把TM
heap提升到了1GB,测试过,在总共500M的数据下没有发生OOM 了

另外咨询一下
我要怎么测试3min来了多少数据?
调整TM应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了

best regards!

r pp <pr...@gmail.com> 于2022年3月27日周日 23:46写道:

> hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
>

Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

Posted by r pp <pr...@gmail.com>.
hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
内总数据量是多少?内存大概多大?可以试着调整TM 的内存量