You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yun Tang <my...@live.com> on 2019/05/02 08:58:30 UTC

Re:[State Backend] 请教个问题,checkpoint恢复失败

Hi
错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么?

祝好
唐云



发自我的小米手机
在 eric <er...@qq.com>,2019年4月30日 16:30写道:

大家好:


刚接触flink, 跑了个测试state checkpoint的程序:
1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会
2) 然后关闭数据源的socket,这时job会进入failed状态
3) 停几秒,把数据源socket重新打开
4) 这时flink会重连socket, 对job进行恢复,恢复时出错了, 存储的MapState没有成功恢复


环境:
    flink: 1.8.0
    flink的hadoop包:flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
    hdfs文件系统: hadoop2.6.0-cdh5.16.1
    运行在standalone模式, state backend选fssystem或rocksdb都没成功



出错的log:


Caused by: java.io<http://java.io>.IOException: Stream closed
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
        at java.io<http://java.io>.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
        at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
        at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
        at java.io<http://java.io>.DataInputStream.readUnsignedByte(DataInputStream.java:288)
        at org.apache.flink.types.StringValue.readString(StringValue.java:769)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
        at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
        at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
        at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)