You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by PedroMrChaves <pe...@gmail.com> on 2019/05/15 12:38:33 UTC

Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Hello,

Every once in a while our checkpoints fail with the following exception:

/AsynchronousException{java.lang.Exception: Could not materialize checkpoint
65912 for operator AGGREGATION-FILTER (2/2).}
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
operator AGGREGATION-FILTER (2/2).
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
in order to obtain the stream state handle
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
	at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
	... 5 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
in order to obtain the stream state handle
	at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
	at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
	... 7 more
Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
does not exist:
/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
(inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
any open files.
	at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
	at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
	at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
	at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
	at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
	at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
	at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy12.complete(Unknown Source)
	at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
	at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
	at com.sun.proxy.$Proxy13.complete(Unknown Source)
	at
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
	at
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
	at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
	at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
	at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
	at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
	... 12 more/

What could be the problem?

Flink version: 1.6.2
Checkpointing configuration:
Screenshot_2019-05-15_at_13.png
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>  
Average state size: ~56 MB




-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
Hello, 

Thanks for the help.
I've attached the logs. Our cluster has 2 job managers (HA) and 4 task
managers. 

logs.tgz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/logs.tgz>  

Regards,
Pedro



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
Hello Andrey,

The audit log doesn't have anything that would point to it being deleted.
The only thing worth mentioning is the following line.

/2019-05-15 10:01:39,082 INFO
org.apache.hadoop.hdfs.server.namenode.FSNamesystem: BLOCK*
blk_1248714854_174974084 is COMMITTED but not COMPLETE(numNodes= 0 < 
minimum = 1) in file
/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc/

Regards,
Pedro Chaves



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Pedro

Could you please share the audit log for file
`/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc`,
seems this did not exist cause this problem (maybe this file was created
and deleted for some reason)

Best,
Congxian


Andrey Zagrebin <an...@ververica.com> 于2019年5月16日周四 下午3:47写道:

> Hi,
>
> could you also post job master logs? and ideally full task manager logs.
> This failure can be caused by some other previous failure.
>
> Best,
> Andrey
>
> On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <pe...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Every once in a while our checkpoints fail with the following exception:
>>
>> /AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint
>> 65912 for operator AGGREGATION-FILTER (2/2).}
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
>> operator AGGREGATION-FILTER (2/2).
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>         ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
>> Could not flush and close the file system output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>         at
>>
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>         ... 5 more
>> Caused by: java.io.IOException: Could not flush and close the file system
>> output stream to
>>
>> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> in order to obtain the stream state handle
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
>>         at
>>
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
>>         at
>> org.apache.flink.runtime.io
>> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>         ... 7 more
>> Caused by:
>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
>> File
>> does not exist:
>>
>> /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
>> (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
>> any open files.
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
>>         at
>>
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
>>         at
>>
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
>>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>         at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>>
>>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>>         at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>>         at com.sun.proxy.$Proxy12.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
>>         at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>>         at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>>         at com.sun.proxy.$Proxy13.complete(Unknown Source)
>>         at
>>
>> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
>>         at
>>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>         at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>>         at
>>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>         at
>>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>         at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
>>         ... 12 more/
>>
>> What could be the problem?
>>
>> Flink version: 1.6.2
>> Checkpointing configuration:
>> Screenshot_2019-05-15_at_13.png
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>
>>
>> Average state size: ~56 MB
>>
>>
>>
>>
>> -----
>> Best Regards,
>> Pedro Chaves
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by Andrey Zagrebin <an...@ververica.com>.
Hi,

could you also post job master logs? and ideally full task manager logs.
This failure can be caused by some other previous failure.

Best,
Andrey

On Wed, May 15, 2019 at 2:48 PM PedroMrChaves <pe...@gmail.com>
wrote:

> Hello,
>
> Every once in a while our checkpoints fail with the following exception:
>
> /AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint
> 65912 for operator AGGREGATION-FILTER (2/2).}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
> operator AGGREGATION-FILTER (2/2).
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
>
> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> in order to obtain the stream state handle
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>         at
>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>         ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
>
> hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> in order to obtain the stream state handle
>         at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>         at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
>         at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
>         at
> org.apache.flink.runtime.io
> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>         ... 7 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
> does not exist:
>
> /flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
> (inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
> any open files.
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
>         at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
>         at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
>         at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
>         at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>
>         at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>         at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>         at com.sun.proxy.$Proxy12.complete(Unknown Source)
>         at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
>         at sun.reflect.GeneratedMethodAccessor143.invoke(Unknown Source)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>         at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>         at com.sun.proxy.$Proxy13.complete(Unknown Source)
>         at
>
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
>         at
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>         at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>         at
>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>         at
>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>         at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
>         ... 12 more/
>
> What could be the problem?
>
> Flink version: 1.6.2
> Checkpointing configuration:
> Screenshot_2019-05-15_at_13.png
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-05-15_at_13.png>
>
> Average state size: ~56 MB
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
Unfortunately the audit logs for hdfs were not enabled. We will enable them
and post he results when the problem happens again. Nonetheless, we don't
have ay other process using hadoop besides flink



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Pedro
From the previous given log, I found that checkpoint 65912 has been expired then, the raise the IOException.
When some checkpoint expired, the checkpoint dir will be deleted(CheckpointCoordinator#549 on release-1.6 branch), and the unfinished task will still write to the previous files, so this IOException will encounter.  Could you please also check the audit log for the checkpoint dir(if the directory did not been deleted, also checkpoint parent directory of the checkpoint directory).


Best Congxian
On May 22, 2019, 04:16 +0800, PedroMrChaves <pe...@gmail.com>, wrote:
> The issue happened again.
>
> /AsynchronousException{java.lang.Exception: Could not materialize checkpoint
> 47400 for operator ENRICH (1/4).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 47400 for
> operator ENRICH (1/4).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
> does not exist:
> /flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
> (inode 188591918) Holder DFSClient_NONMAPREDUCE_-2031154839_1 does not have
> any open files.
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
>
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
> at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy12.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
> at sun.reflect.GeneratedMethodAccessor191.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
> at com.sun.proxy.$Proxy13.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
> ... 12 more/
>
> Any Ideas?
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

Posted by PedroMrChaves <pe...@gmail.com>.
The issue happened again.

/AsynchronousException{java.lang.Exception: Could not materialize checkpoint
47400 for operator ENRICH (1/4).}
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 47400 for
operator ENRICH (1/4).
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
in order to obtain the stream state handle
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
	at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
	... 5 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs:/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
in order to obtain the stream state handle
	at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
	at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
	at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
	... 7 more
Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
does not exist:
/flink/data/checkpoints/da596dd6909fe250ccf553f9c31e000c/chk-47400/72df57cb-2286-4c1a-ade6-79891f39b150
(inode 188591918) Holder DFSClient_NONMAPREDUCE_-2031154839_1 does not have
any open files.
	at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
	at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
	at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
	at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
	at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
	at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
	at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
	at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy12.complete(Unknown Source)
	at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
	at sun.reflect.GeneratedMethodAccessor191.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
	at com.sun.proxy.$Proxy13.complete(Unknown Source)
	at
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:861)
	at
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:839)
	at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
	at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
	at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
	at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
	... 12 more/

Any Ideas?



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/