You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rainie Li <ra...@pinterest.com> on 2021/08/03 07:55:33 UTC

Flink job failure during yarn node termination

Hi Flink Community,

My flink application is running version 1.9 and it failed to recover
(application was running but checkpoint failed and job stopped to process
data) during hadoop yarn node termination.

*Here is job manager log error:*
*2021-07-26 18:02:58,605 INFO
 org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
<http://10.1.185.175:8020>. Trying to fail over immediately.*
*org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby*
at
org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
at
org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
at
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
at
org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
at java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

*Here is error from task manager:*
*2021-07-26 18:01:15,313 ERROR
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
Encountered error while consuming partitions*
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)

*2021-07-26 18:01:15,337 WARN
 org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
while canceling task.*
java.lang.Exception:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
at
com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
at
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
at
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
... 1 more

Any idea what could be the root cause and how to fix it?

Thanks
Best regards
Rainie

Re: Flink job failure during yarn node termination

Posted by Rainie Li <ra...@pinterest.com>.
Hi Nicolaus,

I double checked again our hdfs config, it is setting 1 instead of 2.
I will try the solution you provided.

Thanks again.
Best regards
Rainie

On Wed, Aug 4, 2021 at 10:40 AM Rainie Li <ra...@pinterest.com> wrote:

> Thanks for the context Nicolaus.
> We are using S3 instead of HDFS.
>
> Best regards
> Rainie
>
> On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
> nicolaus.weidner@ververica.com> wrote:
>
>> Hi Rainie,
>>
>> I found a similar issue on stackoverflow, though quite different
>> stacktrace:
>> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
>> Do you replicate data on multiple hdfs nodes like suggested in the answer
>> there?
>>
>> Best,
>> Nico
>>
>> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <ra...@pinterest.com> wrote:
>>
>>> Thanks Till.
>>> We terminated one of the worker nodes.
>>> We enabled HA by using Zookeeper.
>>> Sure, we will try upgrade job to newer version.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>>> you terminating? Have you configured your Yarn cluster to be highly
>>>> available in case you are terminating the ResourceManager?
>>>>
>>>> Flink should retry the operation of starting a new container in case it
>>>> fails. If this is not the case, then please upgrade to one of the actively
>>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>>> version.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Hi Flink Community,
>>>>>
>>>>> My flink application is running version 1.9 and it failed to recover
>>>>> (application was running but checkpoint failed and job stopped to process
>>>>> data) during hadoop yarn node termination.
>>>>>
>>>>> *Here is job manager log error:*
>>>>> *2021-07-26 18:02:58,605 INFO
>>>>>  org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
>>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>> Operation category READ is not supported in state standby*
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>>
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>>>>> at
>>>>> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>>>>> at java.lang.Iterable.forEach(Iterable.java:75)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> *Here is error from task manager:*
>>>>> *2021-07-26 18:01:15,313 ERROR
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>>>>> Encountered error while consuming partitions*
>>>>> java.io.IOException: Connection reset by peer
>>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> *2021-07-26 18:01:15,337 WARN
>>>>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>>>>> while canceling task.*
>>>>> java.lang.Exception:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>>>> at
>>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>>>>> ... 1 more
>>>>>
>>>>> Any idea what could be the root cause and how to fix it?
>>>>>
>>>>> Thanks
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>

Re: Flink job failure during yarn node termination

Posted by Rainie Li <ra...@pinterest.com>.
Thanks for the context Nicolaus.
We are using S3 instead of HDFS.

Best regards
Rainie

On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
nicolaus.weidner@ververica.com> wrote:

> Hi Rainie,
>
> I found a similar issue on stackoverflow, though quite different
> stacktrace:
> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
> Do you replicate data on multiple hdfs nodes like suggested in the answer
> there?
>
> Best,
> Nico
>
> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Thanks Till.
>> We terminated one of the worker nodes.
>> We enabled HA by using Zookeeper.
>> Sure, we will try upgrade job to newer version.
>>
>> Best regards
>> Rainie
>>
>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Rainie,
>>>
>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>> you terminating? Have you configured your Yarn cluster to be highly
>>> available in case you are terminating the ResourceManager?
>>>
>>> Flink should retry the operation of starting a new container in case it
>>> fails. If this is not the case, then please upgrade to one of the actively
>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>> version.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> My flink application is running version 1.9 and it failed to recover
>>>> (application was running but checkpoint failed and job stopped to process
>>>> data) during hadoop yarn node termination.
>>>>
>>>> *Here is job manager log error:*
>>>> *2021-07-26 18:02:58,605 INFO
>>>>  org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>> Operation category READ is not supported in state standby*
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>> at
>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>>> at
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>>> at
>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>>>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>>>> at java.lang.Iterable.forEach(Iterable.java:75)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> *Here is error from task manager:*
>>>> *2021-07-26 18:01:15,313 ERROR
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>>>> Encountered error while consuming partitions*
>>>> java.io.IOException: Connection reset by peer
>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> *2021-07-26 18:01:15,337 WARN
>>>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>>>> while canceling task.*
>>>> java.lang.Exception:
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by:
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>>> at
>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>>>> ... 1 more
>>>>
>>>> Any idea what could be the root cause and how to fix it?
>>>>
>>>> Thanks
>>>> Best regards
>>>> Rainie
>>>>
>>>

Re: Flink job failure during yarn node termination

Posted by Rainie Li <ra...@pinterest.com>.
Thanks Till.
We terminated one of the worker nodes.
We enabled HA by using Zookeeper.
Sure, we will try upgrade job to newer version.

Best regards
Rainie

On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Rainie,
>
> It looks to me as if Yarn is causing this problem. Which Yarn node are you
> terminating? Have you configured your Yarn cluster to be highly available
> in case you are terminating the ResourceManager?
>
> Flink should retry the operation of starting a new container in case it
> fails. If this is not the case, then please upgrade to one of the actively
> maintained Flink versions (1.12 or 1.13) and try whether it works with this
> version.
>
> Cheers,
> Till
>
> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Hi Flink Community,
>>
>> My flink application is running version 1.9 and it failed to recover
>> (application was running but checkpoint failed and job stopped to process
>> data) during hadoop yarn node termination.
>>
>> *Here is job manager log error:*
>> *2021-07-26 18:02:58,605 INFO
>>  org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category READ is not supported in state standby*
>> at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>
>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>> at
>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>> at
>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>> at java.lang.Iterable.forEach(Iterable.java:75)
>> at
>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> *Here is error from task manager:*
>> *2021-07-26 18:01:15,313 ERROR
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>> Encountered error while consuming partitions*
>> java.io.IOException: Connection reset by peer
>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> *2021-07-26 18:01:15,337 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> while canceling task.*
>> java.lang.Exception:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>> at
>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>> ... 1 more
>>
>> Any idea what could be the root cause and how to fix it?
>>
>> Thanks
>> Best regards
>> Rainie
>>
>

Re: Flink job failure during yarn node termination

Posted by Till Rohrmann <tr...@apache.org>.
Hi Rainie,

It looks to me as if Yarn is causing this problem. Which Yarn node are you
terminating? Have you configured your Yarn cluster to be highly available
in case you are terminating the ResourceManager?

Flink should retry the operation of starting a new container in case it
fails. If this is not the case, then please upgrade to one of the actively
maintained Flink versions (1.12 or 1.13) and try whether it works with this
version.

Cheers,
Till

On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:

> Hi Flink Community,
>
> My flink application is running version 1.9 and it failed to recover
> (application was running but checkpoint failed and job stopped to process
> data) during hadoop yarn node termination.
>
> *Here is job manager log error:*
> *2021-07-26 18:02:58,605 INFO
>  org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
> <http://10.1.185.175:8020>. Trying to fail over immediately.*
> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby*
> at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
> at
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
> at
> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> *Here is error from task manager:*
> *2021-07-26 18:01:15,313 ERROR
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
> Encountered error while consuming partitions*
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
>
> *2021-07-26 18:01:15,337 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
> while canceling task.*
> java.lang.Exception:
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
> at
> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
> at
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
> ... 1 more
>
> Any idea what could be the root cause and how to fix it?
>
> Thanks
> Best regards
> Rainie
>