You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kai Fu <zz...@gmail.com> on 2021/08/17 09:30:33 UTC

"Could not flush to file and close the file system output stream" error after checkpoint failure

Hi team,

We faced the following error after checkpoint failure(due to timeout) and
caused the job to restart constantly. We do not have any clue about why the
checkpoint failure causes the IOException and causes the job restart.
Ideally, it should ignore the failed checkpoint and continue to execute.
Could anyone advise on this?


   1. java.lang.RuntimeException: unable to send request to worker
   2. at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.
   enqueue(ChannelStateWriterImpl.java:228)
   3. at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.
   abort(ChannelStateWriterImpl.java:189)
   4. at org.apache.flink.streaming.runtime.tasks.
   SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(
   SubtaskCheckpointCoordinatorImpl.java:363)
   5. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
   $notifyCheckpointAbortAsync$11(StreamTask.java:1139)
   6. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
   $notifyCheckpointOperation$12(StreamTask.java:1153)
   7. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
   .runThrowing(StreamTaskActionExecutor.java:50)
   8. at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java
   :90)
   9. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   processMailsNonBlocking(MailboxProcessor.java:359)
   10. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   processMail(MailboxProcessor.java:323)
   11. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   runMailboxLoop(MailboxProcessor.java:202)
   12. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop
   (StreamTask.java:681)
   13. at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(
   StreamTask.java:636)
   14. at org.apache.flink.streaming.runtime.tasks.StreamTask.
   runWithCleanUpOnFail(StreamTask.java:647)
   15. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
   StreamTask.java:620)
   16. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
   17. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
   18. at java.lang.Thread.run(Thread.java:748)
   19. Caused by: java.io.IOException: Could not flush to file and close
   the file system output stream to hdfs:/user/hadoop/xxx-emr-v1/checkpoints
   /85d990ef016e2dca39704205558414e2/chk-2/514072d9-ed7b-4551-9950-a6c55406f0d2
   in order to obtain the stream state handle
   20. at org.apache.flink.runtime.state.filesystem.
   FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle
   (FsCheckpointStreamFactory.java:373)
   21. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.finishWriteAndResult(
   ChannelStateCheckpointWriter.java:210)
   22. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.doComplete(ChannelStateCheckpointWriter.java
   :232)
   23. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.lambda$complete$5(
   ChannelStateCheckpointWriter.java:194)
   24. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter.
   java:288)
   25. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.complete(ChannelStateCheckpointWriter.java:
   192)
   26. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateCheckpointWriter.completeInput(ChannelStateCheckpointWriter.
   java:179)
   27. at org.apache.flink.runtime.checkpoint.channel.
   CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
   28. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestDispatcherImpl.dispatchInternal(
   ChannelStateWriteRequestDispatcherImpl.java:82)
   29. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestDispatcherImpl.dispatch(
   ChannelStateWriteRequestDispatcherImpl.java:59)
   30. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestExecutorImpl.loop(
   ChannelStateWriteRequestExecutorImpl.java:96)
   31. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestExecutorImpl.run(
   ChannelStateWriteRequestExecutorImpl.java:75)
   32. ... 1 more
   33. Suppressed: java.lang.IllegalStateException: not running
   34. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestExecutorImpl.ensureRunning(
   ChannelStateWriteRequestExecutorImpl.java:152)
   35. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestExecutorImpl.submitInternal(
   ChannelStateWriteRequestExecutorImpl.java:144)
   36. at org.apache.flink.runtime.checkpoint.channel.
   ChannelStateWriteRequestExecutorImpl.submitPriority(
   ChannelStateWriteRequestExecutorImpl.java:133)
   37. at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl
   .enqueue(ChannelStateWriterImpl.java:223)
   38. at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl
   .abort(ChannelStateWriterImpl.java:189)
   39. at org.apache.flink.streaming.runtime.tasks.
   SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(
   SubtaskCheckpointCoordinatorImpl.java:363)
   40. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
   $notifyCheckpointAbortAsync$11(StreamTask.java:1139)
   41. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
   $notifyCheckpointOperation$12(StreamTask.java:1153)
   42. at org.apache.flink.streaming.runtime.tasks.
   StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   43. at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.
   java:90)
   44. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   processMailsNonBlocking(MailboxProcessor.java:359)
   45. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   processMail(MailboxProcessor.java:323)
   46. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.
   runMailboxLoop(MailboxProcessor.java:202)
   47. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop
   (StreamTask.java:681)
   48. at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(
   StreamTask.java:636)
   49. at org.apache.flink.streaming.runtime.tasks.StreamTask.
   runWithCleanUpOnFail(StreamTask.java:647)
   50. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
   StreamTask.java:620)
   51. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
   52. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
   53. ... 1 more
   54. Caused by: org.apache.hadoop.ipc.RemoteException(java.io.
   FileNotFoundException): File does not exist: /user/hadoop/xxx-emr-v1/
   checkpoints/85d990ef016e2dca39704205558414e2/chk-2/514072d9-ed7b-4551-
   9950-a6c55406f0d2 (inode 61461) Holder
DFSClient_NONMAPREDUCE_-1635451037_103
   does not have any open files.
   55. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(
   FSNamesystem.java:2906)
   56. at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.
   completeFileInternal(FSDirWriteFileOp.java:693)
   57. at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.
   completeFile(FSDirWriteFileOp.java:679)
   58. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(
   FSNamesystem.java:2949)
   59. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(
   NameNodeRpcServer.java:947)
   60. at org.apache.hadoop.hdfs.protocolPB.
   ClientNamenodeProtocolServerSideTranslatorPB.complete(
   ClientNamenodeProtocolServerSideTranslatorPB.java:620)
   61. at org.apache.hadoop.hdfs.protocol.proto.
   ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(
   ClientNamenodeProtocolProtos.java)
   62. at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.
   call(ProtobufRpcEngine.java:528)
   63. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
   64. at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
   65. at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
   66. at java.security.AccessController.doPrivileged(Native Method)
   67. at javax.security.auth.Subject.doAs(Subject.java:422)
   68. at org.apache.hadoop.security.UserGroupInformation.doAs(
   UserGroupInformation.java:1730)
   69. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
   70.
   71. at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
   72. at org.apache.hadoop.ipc.Client.call(Client.java:1491)
   73. at org.apache.hadoop.ipc.Client.call(Client.java:1388)
   74. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
   ProtobufRpcEngine.java:233)
   75. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
   ProtobufRpcEngine.java:118)
   76. at com.sun.proxy.$Proxy40.complete(Unknown Source)
   77. at org.apache.hadoop.hdfs.protocolPB.
   ClientNamenodeProtocolTranslatorPB.complete(
   ClientNamenodeProtocolTranslatorPB.java:557)
   78. at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source)
   79. at sun.reflect.DelegatingMethodAccessorImpl.invoke(
   DelegatingMethodAccessorImpl.java:43)
   80. at java.lang.reflect.Method.invoke(Method.java:498)
   81. at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
   RetryInvocationHandler.java:422)
   82. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
   invokeMethod(RetryInvocationHandler.java:165)
   83. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(
   RetryInvocationHandler.java:157)
   84. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(
   RetryInvocationHandler.java:95)
   85. at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
   RetryInvocationHandler.java:359)
   86. at com.sun.proxy.$Proxy41.complete(Unknown Source)
   87. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(
   DFSOutputStream.java:949)
   88. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(
   DFSOutputStream.java:907)
   89. at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.
   java:890)
   90. at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:
   845)
   91. at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
   FSDataOutputStream.java:73)
   92. at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.
   java:102)
   93. at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(
   HadoopDataOutputStream.java:52)
   94. at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(
   ClosingFSDataOutputStream.java:64)
   95. at org.apache.flink.runtime.state.filesystem.
   FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle
   (FsCheckpointStreamFactory.java:354)
   96. ... 12 more



-- 
*Best wishes,*
*- Kai*

Re: "Could not flush to file and close the file system output stream" error after checkpoint failure

Posted by Arvid Heise <ar...@apache.org>.
Hi Kai,

This seems to be a weird HDFS issue to me. Either Flink is HDFS incorrectly
or there is a bug in the libraries. It could be connected to concurrency
[1]. I'm pulling in @Roman Khachatryan <ro...@apache.org> who may know more
about the part.

I see two workarounds:
- Don't use HDFS. Since you run on EMR, S3 should be the preferred option.
- Don't use unaligned checkpoints. If you don't have backpressure, you
could run without unaligned checkpoints [2].

[1]
https://community.cloudera.com/t5/Support-Questions/How-to-resolve-No-lease-on-inode-364006128-File-does-not/td-p/220963
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#checkpointing

On Tue, Aug 17, 2021 at 11:30 AM Kai Fu <zz...@gmail.com> wrote:

> Hi team,
>
> We faced the following error after checkpoint failure(due to timeout) and
> caused the job to restart constantly. We do not have any clue about why the
> checkpoint failure causes the IOException and causes the job restart.
> Ideally, it should ignore the failed checkpoint and continue to execute.
> Could anyone advise on this?
>
>
>    1. java.lang.RuntimeException: unable to send request to worker
>    2. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:228)
>    3. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:189)
>    4. at org.apache.flink.streaming.runtime.tasks.
>    SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(
>    SubtaskCheckpointCoordinatorImpl.java:363)
>    5. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
>    $notifyCheckpointAbortAsync$11(StreamTask.java:1139)
>    6. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
>    $notifyCheckpointOperation$12(StreamTask.java:1153)
>    7. at org.apache.flink.streaming.runtime.tasks.
>    StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50
>    )
>    8. at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.
>    java:90)
>    9. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>    .processMailsNonBlocking(MailboxProcessor.java:359)
>    10. at org.apache.flink.streaming.runtime.tasks.mailbox.
>    MailboxProcessor.processMail(MailboxProcessor.java:323)
>    11. at org.apache.flink.streaming.runtime.tasks.mailbox.
>    MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>    12. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    runMailboxLoop(StreamTask.java:681)
>    13. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    executeInvoke(StreamTask.java:636)
>    14. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    runWithCleanUpOnFail(StreamTask.java:647)
>    15. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>    StreamTask.java:620)
>    16. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>    17. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>    18. at java.lang.Thread.run(Thread.java:748)
>    19. Caused by: java.io.IOException: Could not flush to file and close
>    the file system output stream to hdfs:/user/hadoop/xxx-emr-v1/
>    checkpoints/85d990ef016e2dca39704205558414e2/chk-2/514072d9-ed7b-4551-
>    9950-a6c55406f0d2 in order to obtain the stream state handle
>    20. at org.apache.flink.runtime.state.filesystem.
>    FsCheckpointStreamFactory$FsCheckpointStateOutputStream.
>    closeAndGetHandle(FsCheckpointStreamFactory.java:373)
>    21. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.finishWriteAndResult(
>    ChannelStateCheckpointWriter.java:210)
>    22. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.doComplete(ChannelStateCheckpointWriter.
>    java:232)
>    23. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.lambda$complete$5(
>    ChannelStateCheckpointWriter.java:194)
>    24. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.runWithChecks(ChannelStateCheckpointWriter
>    .java:288)
>    25. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.complete(ChannelStateCheckpointWriter.java
>    :192)
>    26. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateCheckpointWriter.completeInput(ChannelStateCheckpointWriter
>    .java:179)
>    27. at org.apache.flink.runtime.checkpoint.channel.
>    CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
>    28. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestDispatcherImpl.dispatchInternal(
>    ChannelStateWriteRequestDispatcherImpl.java:82)
>    29. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestDispatcherImpl.dispatch(
>    ChannelStateWriteRequestDispatcherImpl.java:59)
>    30. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestExecutorImpl.loop(
>    ChannelStateWriteRequestExecutorImpl.java:96)
>    31. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestExecutorImpl.run(
>    ChannelStateWriteRequestExecutorImpl.java:75)
>    32. ... 1 more
>    33. Suppressed: java.lang.IllegalStateException: not running
>    34. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestExecutorImpl.ensureRunning(
>    ChannelStateWriteRequestExecutorImpl.java:152)
>    35. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestExecutorImpl.submitInternal(
>    ChannelStateWriteRequestExecutorImpl.java:144)
>    36. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriteRequestExecutorImpl.submitPriority(
>    ChannelStateWriteRequestExecutorImpl.java:133)
>    37. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:223)
>    38. at org.apache.flink.runtime.checkpoint.channel.
>    ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:189)
>    39. at org.apache.flink.streaming.runtime.tasks.
>    SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(
>    SubtaskCheckpointCoordinatorImpl.java:363)
>    40. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
>    $notifyCheckpointAbortAsync$11(StreamTask.java:1139)
>    41. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda
>    $notifyCheckpointOperation$12(StreamTask.java:1153)
>    42. at org.apache.flink.streaming.runtime.tasks.
>    StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50
>    )
>    43. at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.
>    java:90)
>    44. at org.apache.flink.streaming.runtime.tasks.mailbox.
>    MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
>    45. at org.apache.flink.streaming.runtime.tasks.mailbox.
>    MailboxProcessor.processMail(MailboxProcessor.java:323)
>    46. at org.apache.flink.streaming.runtime.tasks.mailbox.
>    MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
>    47. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    runMailboxLoop(StreamTask.java:681)
>    48. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    executeInvoke(StreamTask.java:636)
>    49. at org.apache.flink.streaming.runtime.tasks.StreamTask.
>    runWithCleanUpOnFail(StreamTask.java:647)
>    50. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>    StreamTask.java:620)
>    51. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>    52. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>    53. ... 1 more
>    54. Caused by: org.apache.hadoop.ipc.RemoteException(java.io.
>    FileNotFoundException): File does not exist: /user/hadoop/xxx-emr-v1/
>    checkpoints/85d990ef016e2dca39704205558414e2/chk-2/514072d9-ed7b-4551-
>    9950-a6c55406f0d2 (inode 61461) Holder DFSClient_NONMAPREDUCE_-
>    1635451037_103 does not have any open files.
>    55. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(
>    FSNamesystem.java:2906)
>    56. at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.
>    completeFileInternal(FSDirWriteFileOp.java:693)
>    57. at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.
>    completeFile(FSDirWriteFileOp.java:679)
>    58. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile
>    (FSNamesystem.java:2949)
>    59. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>    complete(NameNodeRpcServer.java:947)
>    60. at org.apache.hadoop.hdfs.protocolPB.
>    ClientNamenodeProtocolServerSideTranslatorPB.complete(
>    ClientNamenodeProtocolServerSideTranslatorPB.java:620)
>    61. at org.apache.hadoop.hdfs.protocol.proto.
>    ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.
>    callBlockingMethod(ClientNamenodeProtocolProtos.java)
>    62. at org.apache.hadoop.ipc.
>    ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java
>    :528)
>    63. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
>    64. at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
>    65. at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
>    66. at java.security.AccessController.doPrivileged(Native Method)
>    67. at javax.security.auth.Subject.doAs(Subject.java:422)
>    68. at org.apache.hadoop.security.UserGroupInformation.doAs(
>    UserGroupInformation.java:1730)
>    69. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
>    70.
>    71. at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
>    72. at org.apache.hadoop.ipc.Client.call(Client.java:1491)
>    73. at org.apache.hadoop.ipc.Client.call(Client.java:1388)
>    74. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
>    ProtobufRpcEngine.java:233)
>    75. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
>    ProtobufRpcEngine.java:118)
>    76. at com.sun.proxy.$Proxy40.complete(Unknown Source)
>    77. at org.apache.hadoop.hdfs.protocolPB.
>    ClientNamenodeProtocolTranslatorPB.complete(
>    ClientNamenodeProtocolTranslatorPB.java:557)
>    78. at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source)
>    79. at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>    DelegatingMethodAccessorImpl.java:43)
>    80. at java.lang.reflect.Method.invoke(Method.java:498)
>    81. at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
>    RetryInvocationHandler.java:422)
>    82. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
>    invokeMethod(RetryInvocationHandler.java:165)
>    83. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(
>    RetryInvocationHandler.java:157)
>    84. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
>    invokeOnce(RetryInvocationHandler.java:95)
>    85. at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
>    RetryInvocationHandler.java:359)
>    86. at com.sun.proxy.$Proxy41.complete(Unknown Source)
>    87. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(
>    DFSOutputStream.java:949)
>    88. at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(
>    DFSOutputStream.java:907)
>    89. at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream
>    .java:890)
>    90. at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.
>    java:845)
>    91. at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
>    FSDataOutputStream.java:73)
>    92. at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream
>    .java:102)
>    93. at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(
>    HadoopDataOutputStream.java:52)
>    94. at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(
>    ClosingFSDataOutputStream.java:64)
>    95. at org.apache.flink.runtime.state.filesystem.
>    FsCheckpointStreamFactory$FsCheckpointStateOutputStream.
>    closeAndGetHandle(FsCheckpointStreamFactory.java:354)
>    96. ... 12 more
>
>
>
> --
> *Best wishes,*
> *- Kai*
>