You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王默 <ws...@163.com> on 2020/11/27 09:22:00 UTC
带有状态的算子保存checkpoint失败
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2
附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码
测试代码部分:
public class TestStateProcess extends KeyedProcessFunction<String, NLMessage, NLMessage> {
private transient ValueState<Integer> userCount;
@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("userId", TypeInformation.of(new TypeHint<Integer>() {}));
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);
userCount = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void processElement(NLMessage value, Context ctx, Collector<NLMessage> out) throws Exception {
try {
if (null == userCount.value()) {
userCount.update(1);
} else {
userCount.update(userCount.value() + 1);
}
if (userCount.value() > 10) {
System.out.println(new Date() + " userId: " + ctx.getCurrentKey() + " count: " + userCount.value());
}
out.collect(value);
} catch (IOException e) {
e.printStackTrace();
}
}
}
checkpoint配置:
env.setStateBackend(new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
万分感谢!