You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 邵志鹏 <bo...@163.com> on 2019/05/06 02:05:37 UTC

Re:Re: 使用hdfs保存checkpoint一段时间后报错


Hi Yun Tang


谢谢唐老师...


HDFS异常问题找到了,是我对hadoop集群重新格式化的时候,没有把SecondaryNameNode的tmp目录的内容删掉,导致格式化后,NameNode、DataNode中VERSION里面ClusterID与SecondaryNameNode的不一致,所以会不定时报出异常,问题已经解决啦。感谢~~


另外有个小问题请教一下:
Flink提供的默认EventTimeTrigger.java里面,在onElement(...)和onEventTime(...)两个方法被调用时才有机会TriggerResult.FIRE,而水印Watermark的增长基本都是晚于时间窗口endtime的,这样就导致必须有新的事件(或数据)到来才能推动TriggerResult.FIRE。


比如,时间窗口的endtime还没到自然时间点,endtime是2019-09-28 09:15:00,当前自然时间是2019-09-28 09:11:00,也就是endtime还没到,还差4分钟,随着自然时间的推进,数据流在2019-09-28 09:13:00中断了【暂时没有数据,不是程序崩溃】。
2019-09-28 09:11:00与2019-09-28 09:13:00之间是有数据的,继续随着自然时间的推进,自然时间到2019-09-28 09:15:00时并没有TriggerResult.FIRE,而是进一步随着自然时间的推进,自然时间到2019-09-28 09:20:00时,有新的数据进入Flink并被处理,这时【endtime是2019-09-28 09:15:00】的窗口才被FIRE,新的时间窗口又被往后推。


从而导致,TriggerResult.FIRE不及时,这样的话,如果数据流总是间隔断断续续,使用eventtime就没什么意义了【或者数据流本身不适合Flink】,只能使用ProcessingTime,所以有什么办法可以改进,当时间窗口的endtime到达自然时间点时就可以TriggerResult.FIRE。如果自定义Trigger,如何“嵌入”或者应用到FlinkSQL里面而不是StreamAPI上面呢...


也疑惑过,是不是时间的时区问题,使用FlinkSQL的时候,会把rowtime(eventtime)和proctime - 8个小时(UTC0)【而EventTimeTrigger.java相关的时间又是local的时区时间?...】,从而导致TriggerResult.FIRE不及时?
org.apache.calcite.runtime.SqlFunctions
publicstaticjava.sql.Timestamp internalToTimestamp(long v) {
    returnnewjava.sql.Timestamp(v -LOCAL_TZ.getOffset(v));
  }


经过测试,水印的时间戳直接使用System.currentTimeMillis()是可以及时FIRE的,但是消费kafka就不能从上一次的offset开始了,始终是处理flink程序启动后kafka新接收到的数据,从头消费和Flink重启接着消费就失效了。


目前还没有最终解决(不知道大家生产环境中使用eventtime和flinksql的时候是怎么处理的,是就这样忽略掉还是我理解错了...好别扭)。


谢谢指导。


邵志鹏

在 2019-04-30 16:12:52,"Yun Tang" <my...@live.com> 写道:
>Hi 志鹏
>
>核心原因是HDFS的问题
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
>
>在出现问题的时候,观察一下集群HDFS的情况,以及相关的日志。
>也许这个stackoverflow的回答[1] 能帮助到你。
>
>
>[1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025<https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025>
>
>祝好
>唐云
>
>
>________________________________
>From: 邵志鹏 <bo...@163.com>
>Sent: Tuesday, April 30, 2019 15:26
>To: user-zh@flink.apache.org
>Subject: 使用hdfs保存checkpoint一段时间后报错
>
>使用hdfs保存checkpoint一段时间后报错,自动重启后正常运行一段时间后继续报同样的错
>
>Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://master:9000/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff in order to obtain the stream state handle
>        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>        at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:765)
>        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
>        at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>        ... 7 more
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
>        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1726)
>        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2567)
>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:829)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
>        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:422)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
>
>        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>        at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>        at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>        at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
>        at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:498)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
>        at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1838)
>        at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1638)
>        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:704)

RE: Re:Re: 使用hdfs保存checkpoint一段时间后报错

Posted by Shi Quan <qu...@outlook.com>.
Hi,



你对eventTime和窗口的分析很赞。

如果你的核心诉求是数据长时间中断情况下,希望不要丢弃处理老的数据。allowLateness应该能解决你的问题。



祝好

石权



Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: 邵志鹏 <bo...@163.com>
Sent: Monday, May 6, 2019 10:05:37 AM
To: user-zh@flink.apache.org
Subject: Re:Re: 使用hdfs保存checkpoint一段时间后报错



Hi Yun Tang


谢谢唐老师...


HDFS异常问题找到了,是我对hadoop集群重新格式化的时候,没有把SecondaryNameNode的tmp目录的内容删掉,导致格式化后,NameNode、DataNode中VERSION里面ClusterID与SecondaryNameNode的不一致,所以会不定时报出异常,问题已经解决啦。感谢~~


另外有个小问题请教一下:
Flink提供的默认EventTimeTrigger.java里面,在onElement(...)和onEventTime(...)两个方法被调用时才有机会TriggerResult.FIRE,而水印Watermark的增长基本都是晚于时间窗口endtime的,这样就导致必须有新的事件(或数据)到来才能推动TriggerResult.FIRE。


比如,时间窗口的endtime还没到自然时间点,endtime是2019-09-28 09:15:00,当前自然时间是2019-09-28 09:11:00,也就是endtime还没到,还差4分钟,随着自然时间的推进,数据流在2019-09-28 09:13:00中断了【暂时没有数据,不是程序崩溃】。
2019-09-28 09:11:00与2019-09-28 09:13:00之间是有数据的,继续随着自然时间的推进,自然时间到2019-09-28 09:15:00时并没有TriggerResult.FIRE,而是进一步随着自然时间的推进,自然时间到2019-09-28 09:20:00时,有新的数据进入Flink并被处理,这时【endtime是2019-09-28 09:15:00】的窗口才被FIRE,新的时间窗口又被往后推。


从而导致,TriggerResult.FIRE不及时,这样的话,如果数据流总是间隔断断续续,使用eventtime就没什么意义了【或者数据流本身不适合Flink】,只能使用ProcessingTime,所以有什么办法可以改进,当时间窗口的endtime到达自然时间点时就可以TriggerResult.FIRE。如果自定义Trigger,如何“嵌入”或者应用到FlinkSQL里面而不是StreamAPI上面呢...


也疑惑过,是不是时间的时区问题,使用FlinkSQL的时候,会把rowtime(eventtime)和proctime - 8个小时(UTC0)【而EventTimeTrigger.java相关的时间又是local的时区时间?...】,从而导致TriggerResult.FIRE不及时?
org.apache.calcite.runtime.SqlFunctions
publicstaticjava.sql.Timestamp internalToTimestamp(long v) {
    returnnewjava.sql.Timestamp(v -LOCAL_TZ.getOffset(v));
  }


经过测试,水印的时间戳直接使用System.currentTimeMillis()是可以及时FIRE的,但是消费kafka就不能从上一次的offset开始了,始终是处理flink程序启动后kafka新接收到的数据,从头消费和Flink重启接着消费就失效了。


目前还没有最终解决(不知道大家生产环境中使用eventtime和flinksql的时候是怎么处理的,是就这样忽略掉还是我理解错了...好别扭)。


谢谢指导。


邵志鹏

在 2019-04-30 16:12:52,"Yun Tang" <my...@live.com> 写道:
>Hi 志鹏
>
>核心原因是HDFS的问题
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
>
>在出现问题的时候,观察一下集群HDFS的情况,以及相关的日志。
>也许这个stackoverflow的回答[1] 能帮助到你。
>
>
>[1] https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025<https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025>
>
>祝好
>唐云
>
>
>________________________________
>From: 邵志鹏 <bo...@163.com>
>Sent: Tuesday, April 30, 2019 15:26
>To: user-zh@flink.apache.org
>Subject: 使用hdfs保存checkpoint一段时间后报错
>
>使用hdfs保存checkpoint一段时间后报错,自动重启后正常运行一段时间后继续报同样的错
>
>Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://master:9000/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff in order to obtain the stream state handle
>        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>        at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:765)
>        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
>        at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>        ... 7 more
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1).  There are 3 datanode(s) running and no node(s) are excluded in this operation.
>        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1726)
>        at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2567)
>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:829)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
>        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:422)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
>
>        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>        at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>        at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>        at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
>        at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:498)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>        at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
>        at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1838)
>        at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1638)
>        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:704)