You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rainie Li <ra...@pinterest.com> on 2021/08/03 07:55:33 UTC
Flink job failure during yarn node termination
Hi Flink Community,
My flink application is running version 1.9 and it failed to recover
(application was running but checkpoint failed and job stopped to process
data) during hadoop yarn node termination.
*Here is job manager log error:*
*2021-07-26 18:02:58,605 INFO
org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
<http://10.1.185.175:8020>. Trying to fail over immediately.*
*org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby*
at
org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
at
org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
at
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
at
org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
at java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
*Here is error from task manager:*
*2021-07-26 18:01:15,313 ERROR
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
Encountered error while consuming partitions*
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
*2021-07-26 18:01:15,337 WARN
org.apache.flink.streaming.runtime.tasks.StreamTask - Error
while canceling task.*
java.lang.Exception:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
at
com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
at
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
at
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
... 1 more
Any idea what could be the root cause and how to fix it?
Thanks
Best regards
Rainie
Re: Flink job failure during yarn node termination
Posted by Rainie Li <ra...@pinterest.com>.
Hi Nicolaus,
I double checked again our hdfs config, it is setting 1 instead of 2.
I will try the solution you provided.
Thanks again.
Best regards
Rainie
On Wed, Aug 4, 2021 at 10:40 AM Rainie Li <ra...@pinterest.com> wrote:
> Thanks for the context Nicolaus.
> We are using S3 instead of HDFS.
>
> Best regards
> Rainie
>
> On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
> nicolaus.weidner@ververica.com> wrote:
>
>> Hi Rainie,
>>
>> I found a similar issue on stackoverflow, though quite different
>> stacktrace:
>> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
>> Do you replicate data on multiple hdfs nodes like suggested in the answer
>> there?
>>
>> Best,
>> Nico
>>
>> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <ra...@pinterest.com> wrote:
>>
>>> Thanks Till.
>>> We terminated one of the worker nodes.
>>> We enabled HA by using Zookeeper.
>>> Sure, we will try upgrade job to newer version.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>>> you terminating? Have you configured your Yarn cluster to be highly
>>>> available in case you are terminating the ResourceManager?
>>>>
>>>> Flink should retry the operation of starting a new container in case it
>>>> fails. If this is not the case, then please upgrade to one of the actively
>>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>>> version.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Hi Flink Community,
>>>>>
>>>>> My flink application is running version 1.9 and it failed to recover
>>>>> (application was running but checkpoint failed and job stopped to process
>>>>> data) during hadoop yarn node termination.
>>>>>
>>>>> *Here is job manager log error:*
>>>>> *2021-07-26 18:02:58,605 INFO
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>> Operation category READ is not supported in state standby*
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>>
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>>>>> at
>>>>> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>>>>> at java.lang.Iterable.forEach(Iterable.java:75)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> *Here is error from task manager:*
>>>>> *2021-07-26 18:01:15,313 ERROR
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
>>>>> Encountered error while consuming partitions*
>>>>> java.io.IOException: Connection reset by peer
>>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> *2021-07-26 18:01:15,337 WARN
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error
>>>>> while canceling task.*
>>>>> java.lang.Exception:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>>>> at
>>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>>>>> ... 1 more
>>>>>
>>>>> Any idea what could be the root cause and how to fix it?
>>>>>
>>>>> Thanks
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>
Re: Flink job failure during yarn node termination
Posted by Rainie Li <ra...@pinterest.com>.
Thanks for the context Nicolaus.
We are using S3 instead of HDFS.
Best regards
Rainie
On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
nicolaus.weidner@ververica.com> wrote:
> Hi Rainie,
>
> I found a similar issue on stackoverflow, though quite different
> stacktrace:
> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
> Do you replicate data on multiple hdfs nodes like suggested in the answer
> there?
>
> Best,
> Nico
>
> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Thanks Till.
>> We terminated one of the worker nodes.
>> We enabled HA by using Zookeeper.
>> Sure, we will try upgrade job to newer version.
>>
>> Best regards
>> Rainie
>>
>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Rainie,
>>>
>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>> you terminating? Have you configured your Yarn cluster to be highly
>>> available in case you are terminating the ResourceManager?
>>>
>>> Flink should retry the operation of starting a new container in case it
>>> fails. If this is not the case, then please upgrade to one of the actively
>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>> version.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> My flink application is running version 1.9 and it failed to recover
>>>> (application was running but checkpoint failed and job stopped to process
>>>> data) during hadoop yarn node termination.
>>>>
>>>> *Here is job manager log error:*
>>>> *2021-07-26 18:02:58,605 INFO
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>> Operation category READ is not supported in state standby*
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>> at
>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>>> at
>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>>> at
>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>>>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>>>> at java.lang.Iterable.forEach(Iterable.java:75)
>>>> at
>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> *Here is error from task manager:*
>>>> *2021-07-26 18:01:15,313 ERROR
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
>>>> Encountered error while consuming partitions*
>>>> java.io.IOException: Connection reset by peer
>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> *2021-07-26 18:01:15,337 WARN
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error
>>>> while canceling task.*
>>>> java.lang.Exception:
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by:
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>>> at
>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>>>> ... 1 more
>>>>
>>>> Any idea what could be the root cause and how to fix it?
>>>>
>>>> Thanks
>>>> Best regards
>>>> Rainie
>>>>
>>>
Re: Flink job failure during yarn node termination
Posted by Rainie Li <ra...@pinterest.com>.
Thanks Till.
We terminated one of the worker nodes.
We enabled HA by using Zookeeper.
Sure, we will try upgrade job to newer version.
Best regards
Rainie
On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <tr...@apache.org> wrote:
> Hi Rainie,
>
> It looks to me as if Yarn is causing this problem. Which Yarn node are you
> terminating? Have you configured your Yarn cluster to be highly available
> in case you are terminating the ResourceManager?
>
> Flink should retry the operation of starting a new container in case it
> fails. If this is not the case, then please upgrade to one of the actively
> maintained Flink versions (1.12 or 1.13) and try whether it works with this
> version.
>
> Cheers,
> Till
>
> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:
>
>> Hi Flink Community,
>>
>> My flink application is running version 1.9 and it failed to recover
>> (application was running but checkpoint failed and job stopped to process
>> data) during hadoop yarn node termination.
>>
>> *Here is job manager log error:*
>> *2021-07-26 18:02:58,605 INFO
>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category READ is not supported in state standby*
>> at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>
>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>> at
>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>> at
>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>> at java.lang.Iterable.forEach(Iterable.java:75)
>> at
>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> *Here is error from task manager:*
>> *2021-07-26 18:01:15,313 ERROR
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
>> Encountered error while consuming partitions*
>> java.io.IOException: Connection reset by peer
>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>> at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> *2021-07-26 18:01:15,337 WARN
>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error
>> while canceling task.*
>> java.lang.Exception:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>> at
>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>> at
>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>> ... 1 more
>>
>> Any idea what could be the root cause and how to fix it?
>>
>> Thanks
>> Best regards
>> Rainie
>>
>
Re: Flink job failure during yarn node termination
Posted by Till Rohrmann <tr...@apache.org>.
Hi Rainie,
It looks to me as if Yarn is causing this problem. Which Yarn node are you
terminating? Have you configured your Yarn cluster to be highly available
in case you are terminating the ResourceManager?
Flink should retry the operation of starting a new container in case it
fails. If this is not the case, then please upgrade to one of the actively
maintained Flink versions (1.12 or 1.13) and try whether it works with this
version.
Cheers,
Till
On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <ra...@pinterest.com> wrote:
> Hi Flink Community,
>
> My flink application is running version 1.9 and it failed to recover
> (application was running but checkpoint failed and job stopped to process
> data) during hadoop yarn node termination.
>
> *Here is job manager log error:*
> *2021-07-26 18:02:58,605 INFO
> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
> <http://10.1.185.175:8020>. Trying to fail over immediately.*
> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby*
> at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
> at
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
> at
> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> *Here is error from task manager:*
> *2021-07-26 18:01:15,313 ERROR
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
> Encountered error while consuming partitions*
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
>
> *2021-07-26 18:01:15,337 WARN
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error
> while canceling task.*
> java.lang.Exception:
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
> at
> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
> at
> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
> ... 1 more
>
> Any idea what could be the root cause and how to fix it?
>
> Thanks
> Best regards
> Rainie
>