You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yun Tang <my...@live.com> on 2019/05/02 08:58:30 UTC
Re:[State Backend] 请教个问题,checkpoint恢复失败
Hi
错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么?
祝好
唐云
发自我的小米手机
在 eric <er...@qq.com>,2019年4月30日 16:30写道:
大家好:
刚接触flink, 跑了个测试state checkpoint的程序:
1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会
2) 然后关闭数据源的socket,这时job会进入failed状态
3) 停几秒,把数据源socket重新打开
4) 这时flink会重连socket, 对job进行恢复,恢复时出错了, 存储的MapState没有成功恢复
环境:
flink: 1.8.0
flink的hadoop包:flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
hdfs文件系统: hadoop2.6.0-cdh5.16.1
运行在standalone模式, state backend选fssystem或rocksdb都没成功
出错的log:
Caused by: java.io<http://java.io>.IOException: Stream closed
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
at java.io<http://java.io>.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
at java.io<http://java.io>.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:769)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)