You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王默 <ws...@163.com> on 2020/11/27 09:22:00 UTC

带有状态的算子保存checkpoint失败

Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2


附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码


测试代码部分:
public class TestStateProcess extends KeyedProcessFunction<String, NLMessage, NLMessage> {
    private transient ValueState<Integer> userCount;


    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("userId", TypeInformation.of(new TypeHint<Integer>() {}));


            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            descriptor.enableTimeToLive(ttlConfig);


            userCount = getRuntimeContext().getState(descriptor);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Override
    public void processElement(NLMessage value, Context ctx, Collector<NLMessage> out) throws Exception {
        try {
            if (null == userCount.value()) {
                userCount.update(1);


            } else {
                userCount.update(userCount.value() + 1);
            }


            if (userCount.value() > 10) {
                System.out.println(new Date() + " userId: " + ctx.getCurrentKey() + " count: " + userCount.value());
            }


            out.collect(value);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


checkpoint配置:
            env.setStateBackend(new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
            env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


万分感谢!