You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrey Zagrebin <an...@ververica.com> on 2019/05/16 07:46:51 UTC

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Hi,

could you also post job master logs? and ideally full task manager logs.
This failure can be caused by some other previous failure.

Best,
Andrey

On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <pe...@gmail.com>
wrote:

> Hello,
>
> Every once in a while our checkpoints fail with the following exception:
>
> /AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint
> 65912 for operator AGGREGATION-FILTER (2/2).}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
> operator AGGREGATION-FILTER (2/2).
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
>
> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> in order to obtain the stream state handle
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>         at
>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>         ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
>
> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> in order to obtain the stream state handle
>         at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>         at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
>         at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
>         at
> org.apache.flink.runtime.io
> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>         ... 7 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
> does not exist:
>
> /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
> any open files.
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
>         at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
>         at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>
>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>         at com.sun.proxy.$Proxy12.complete(Unknown Source)
>         at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
>         at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>         at com.sun.proxy.$Proxy13.complete(Unknown Source)
>         at
>
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
>         at
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>         at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>         at
>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>         at
>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>         at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
>         ... 12 more/
>
> What could be the problem?
>
> Flink version: 1.6.2
> Checkpointing configuration:
> Screenshot_2019-05-15_at_13.png
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>
>
> Average state size: ~56 MB
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
Hello, 

Thanks for the help.
I've attached the logs. Our cluster has 2 job managers (HA) and 4 task
managers. 

logs.tgz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/logs.tgz>  

Regards,
Pedro



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
Hello Andrey,

The audit log doesn't have anything that would point to it being deleted.
The only thing worth mentioning is the following line.

/2019-05-15 10:01:39,082 INFO
org.apache.hadoop.hdfs.server.namenode.FSNamesystem: BLOCK*
blk_1248714854_174974084 is COMMITTED but not COMPLETE(numNodes= 0 < 
minimum = 1) in file
/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc/

Regards,
Pedro Chaves



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Pedro

Could you please share the audit log for file
`/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc`,
seems this did not exist cause this problem (maybe this file was created
and deleted for some reason)

Best,
Congxian


Andrey Zagrebin <an...@ververica.com> 于2019年5月16日周四 下午3:47写道:

> Hi,
>
> could you also post job master logs? and ideally full task manager logs.
> This failure can be caused by some other previous failure.
>
> Best,
> Andrey
>
> On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <pe...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Every once in a while our checkpoints fail with the following exception:
>>
>> /AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint
>> 65912 for operator AGGREGATION-FILTER (2/2).}
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
>> operator AGGREGATION-FILTER (2/2).
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>         ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
>> Could not flush and close the file system output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>         at
>>
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>         ... 5 more
>> Caused by: java.io.IOException: Could not flush and close the file system
>> output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
>>         at
>> org.apache.flink.runtime.io
>> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>         ... 7 more
>> Caused by:
>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
>> File
>> does not exist:
>>
>> /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
>> any open files.
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
>>         at
>>
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>         at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>>
>>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>>         at com.sun.proxy.$Proxy12.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
>>         at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>>         at com.sun.proxy.$Proxy13.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
>>         at
>>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>         at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>>         at
>>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>         at
>>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
>>         ... 12 more/
>>
>> What could be the problem?
>>
>> Flink version: 1.6.2
>> Checkpointing configuration:
>> Screenshot_2019-05-15_at_13.png
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>
>>
>> Average state size: ~56 MB
>>
>>
>>
>>
>> -----
>> Best Regards,
>> Pedro Chaves
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>