You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zbyszko Papierski <zp...@wikimedia.org> on 2021/08/20 15:07:13 UTC

Sending the partition request to 'null' failed on Kubernetes cluster, session mode

Hi!

We're trying to successfully deploy our application to our Kubernetes
cluster and we seem to have hit a snag. Long story short - any kind of
deployment that involves a cluster of more than 1 TM seem to fail our job
almost immediately with this exception:

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Sending the partition request to 'null' failed.
> at
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
> at
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
> buffer.memory = 33554432
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.nio.channels.ClosedChannelException
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> ... 11 more


I don't have enough experience to judge from the rest of the logs what can
be the reason for that, but I'm including the debug logs that I can get
from kubectl, both JM and TM.
We use a session cluster deployed on Kubernetes (not Kubernetes native,
there are still some issues why we can't use it) and we deploy our app to
that cluster. We have confirmed that everything works when there's a single
Task Manager, but we rather not continue with that limitation. The way we
define the cluster itself on k8s is shown here [1] and the chart for the
deployment itself can be found here [2]. App we're deploying is available
here [3]. We're running Flink 1.21.1 on openjdk-jre 11.

 Since I overslept k8s revolution a bit and am somewhat new to it, I am not
sure which information to provide to make the situation clearer, but any
help is greatly appreciated!


[1]
https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
[2]
https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
[3]
https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
-- 

Zbyszko Papierski (He/Him)

Senior Software Engineer

Wikimedia Foundation <https://wikimediafoundation.org/>

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

Posted by Zbyszko Papierski <zp...@wikimedia.org>.
Yes, it turned out that we only configured TM<->JM communication correctly
- inter TM config was missing, hence being "reject all". Thanks for the
suggestion!

On Mon, Aug 23, 2021 at 5:29 PM Arvid Heise <ar...@apache.org> wrote:

> It rather looks to me as if the task manager can not communicate with each
> other. Can you check your network policies? Are they allowed to communicate
> on random ports?
>
> On Mon, Aug 23, 2021 at 8:37 AM Zbyszko Papierski <
> zpapierski@wikimedia.org> wrote:
>
>> Hi,
>>
>> No, they don't - only the job is being restarted after that, without any
>> luck. Exception I provided is added to a exceptions list of the job itself.
>>
>> On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This might be that some task managers cannot reach out to the job
>>> manager in time. Has any of the task manager instance restarted after this
>>> failure? If yes, what does the log (Flink log and kubernetes log) of the
>>> failed task manager say?
>>>
>>> Zbyszko Papierski <zp...@wikimedia.org> 于2021年8月20日周五 下午11:07写道:
>>>
>>>> Hi!
>>>>
>>>> We're trying to successfully deploy our application to our Kubernetes
>>>> cluster and we seem to have hit a snag. Long story short - any kind of
>>>> deployment that involves a cluster of more than 1 TM seem to fail our job
>>>> almost immediately with this exception:
>>>>
>>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>>>> Sending the partition request to 'null' failed.
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>>>> buffer.memory = 33554432
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>>>>
>>>>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>>>>
>>>>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>>>>> ... 11 more
>>>>
>>>>
>>>> I don't have enough experience to judge from the rest of the logs what
>>>> can be the reason for that, but I'm including the debug logs that I can get
>>>> from kubectl, both JM and TM.
>>>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>>>> there are still some issues why we can't use it) and we deploy our app to
>>>> that cluster. We have confirmed that everything works when there's a single
>>>> Task Manager, but we rather not continue with that limitation. The way we
>>>> define the cluster itself on k8s is shown here [1] and the chart for the
>>>> deployment itself can be found here [2]. App we're deploying is available
>>>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>>>
>>>>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
>>>> not sure which information to provide to make the situation clearer, but
>>>> any help is greatly appreciated!
>>>>
>>>>
>>>> [1]
>>>> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
>>>> [2]
>>>> https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
>>>> [3]
>>>> https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
>>>> --
>>>>
>>>> Zbyszko Papierski (He/Him)
>>>>
>>>> Senior Software Engineer
>>>>
>>>> Wikimedia Foundation <https://wikimediafoundation.org/>
>>>>
>>>
>>
>> --
>>
>> Zbyszko Papierski (He/Him)
>>
>> Senior Software Engineer
>>
>> Wikimedia Foundation <https://wikimediafoundation.org/>
>>
>

-- 

Zbyszko Papierski (He/Him)

Senior Software Engineer

Wikimedia Foundation <https://wikimediafoundation.org/>

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

Posted by Arvid Heise <ar...@apache.org>.
It rather looks to me as if the task manager can not communicate with each
other. Can you check your network policies? Are they allowed to communicate
on random ports?

On Mon, Aug 23, 2021 at 8:37 AM Zbyszko Papierski <zp...@wikimedia.org>
wrote:

> Hi,
>
> No, they don't - only the job is being restarted after that, without any
> luck. Exception I provided is added to a exceptions list of the job itself.
>
> On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> This might be that some task managers cannot reach out to the job manager
>> in time. Has any of the task manager instance restarted after this failure?
>> If yes, what does the log (Flink log and kubernetes log) of the failed task
>> manager say?
>>
>> Zbyszko Papierski <zp...@wikimedia.org> 于2021年8月20日周五 下午11:07写道:
>>
>>> Hi!
>>>
>>> We're trying to successfully deploy our application to our Kubernetes
>>> cluster and we seem to have hit a snag. Long story short - any kind of
>>> deployment that involves a cluster of more than 1 TM seem to fail our job
>>> almost immediately with this exception:
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>>> Sending the partition request to 'null' failed.
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>>> buffer.memory = 33554432
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>>>
>>>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>>>
>>>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>> Caused by: java.nio.channels.ClosedChannelException
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>>>> ... 11 more
>>>
>>>
>>> I don't have enough experience to judge from the rest of the logs what
>>> can be the reason for that, but I'm including the debug logs that I can get
>>> from kubectl, both JM and TM.
>>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>>> there are still some issues why we can't use it) and we deploy our app to
>>> that cluster. We have confirmed that everything works when there's a single
>>> Task Manager, but we rather not continue with that limitation. The way we
>>> define the cluster itself on k8s is shown here [1] and the chart for the
>>> deployment itself can be found here [2]. App we're deploying is available
>>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>>
>>>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
>>> not sure which information to provide to make the situation clearer, but
>>> any help is greatly appreciated!
>>>
>>>
>>> [1]
>>> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
>>> [2]
>>> https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
>>> [3]
>>> https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
>>> --
>>>
>>> Zbyszko Papierski (He/Him)
>>>
>>> Senior Software Engineer
>>>
>>> Wikimedia Foundation <https://wikimediafoundation.org/>
>>>
>>
>
> --
>
> Zbyszko Papierski (He/Him)
>
> Senior Software Engineer
>
> Wikimedia Foundation <https://wikimediafoundation.org/>
>

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

Posted by Zbyszko Papierski <zp...@wikimedia.org>.
Hi,

No, they don't - only the job is being restarted after that, without any
luck. Exception I provided is added to a exceptions list of the job itself.

On Mon, Aug 23, 2021 at 4:50 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> This might be that some task managers cannot reach out to the job manager
> in time. Has any of the task manager instance restarted after this failure?
> If yes, what does the log (Flink log and kubernetes log) of the failed task
> manager say?
>
> Zbyszko Papierski <zp...@wikimedia.org> 于2021年8月20日周五 下午11:07写道:
>
>> Hi!
>>
>> We're trying to successfully deploy our application to our Kubernetes
>> cluster and we seem to have hit a snag. Long story short - any kind of
>> deployment that involves a cluster of more than 1 TM seem to fail our job
>> almost immediately with this exception:
>>
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> Sending the partition request to 'null' failed.
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>> buffer.memory = 33554432
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>>
>>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: java.nio.channels.ClosedChannelException
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>>> ... 11 more
>>
>>
>> I don't have enough experience to judge from the rest of the logs what
>> can be the reason for that, but I'm including the debug logs that I can get
>> from kubectl, both JM and TM.
>> We use a session cluster deployed on Kubernetes (not Kubernetes native,
>> there are still some issues why we can't use it) and we deploy our app to
>> that cluster. We have confirmed that everything works when there's a single
>> Task Manager, but we rather not continue with that limitation. The way we
>> define the cluster itself on k8s is shown here [1] and the chart for the
>> deployment itself can be found here [2]. App we're deploying is available
>> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>>
>>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
>> not sure which information to provide to make the situation clearer, but
>> any help is greatly appreciated!
>>
>>
>> [1]
>> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
>> [2]
>> https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
>> [3]
>> https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
>> --
>>
>> Zbyszko Papierski (He/Him)
>>
>> Senior Software Engineer
>>
>> Wikimedia Foundation <https://wikimediafoundation.org/>
>>
>

-- 

Zbyszko Papierski (He/Him)

Senior Software Engineer

Wikimedia Foundation <https://wikimediafoundation.org/>

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

This might be that some task managers cannot reach out to the job manager
in time. Has any of the task manager instance restarted after this failure?
If yes, what does the log (Flink log and kubernetes log) of the failed task
manager say?

Zbyszko Papierski <zp...@wikimedia.org> 于2021年8月20日周五 下午11:07写道:

> Hi!
>
> We're trying to successfully deploy our application to our Kubernetes
> cluster and we seem to have hit a snag. Long story short - any kind of
> deployment that involves a cluster of more than 1 TM seem to fail our job
> almost immediately with this exception:
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> Sending the partition request to 'null' failed.
>> at
>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>> buffer.memory = 33554432
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>>
>> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>>
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> Caused by: java.nio.channels.ClosedChannelException
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>> ... 11 more
>
>
> I don't have enough experience to judge from the rest of the logs what can
> be the reason for that, but I'm including the debug logs that I can get
> from kubectl, both JM and TM.
> We use a session cluster deployed on Kubernetes (not Kubernetes native,
> there are still some issues why we can't use it) and we deploy our app to
> that cluster. We have confirmed that everything works when there's a single
> Task Manager, but we rather not continue with that limitation. The way we
> define the cluster itself on k8s is shown here [1] and the chart for the
> deployment itself can be found here [2]. App we're deploying is available
> here [3]. We're running Flink 1.21.1 on openjdk-jre 11.
>
>  Since I overslept k8s revolution a bit and am somewhat new to it, I am
> not sure which information to provide to make the situation clearer, but
> any help is greatly appreciated!
>
>
> [1]
> https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster
> [2]
> https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater
> [3]
> https://github.com/wikimedia/wikidata-query-rdf/tree/master/streaming-updater-producer
> --
>
> Zbyszko Papierski (He/Him)
>
> Senior Software Engineer
>
> Wikimedia Foundation <https://wikimediafoundation.org/>
>