You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com> on 2018/07/23 06:10:56 UTC

回复:Network PartitionNotFoundException when run on multi nodes

Hi Steffen,

This exception indicates that when the downstream task requests partition from the upstream task, the upstream task has not initialized to register its result partition.
In this case, the downstream task will inquire the state from job manager, and then retry to request partition from the upstream until the maximum retry timeout. You can increase the parameter of "taskmanager.network.request-backoff.max" to check whether it works, the default value is 10s.

BTW, you should check why the upstream registers its result partition delayed, maybe the upstream TaskManager received the task deployment delayed from JobManager, or some operations in upstream task initialization unexpectly cost more time before registering result partition. 

Best,
Zhijiang
------------------------------------------------------------------
发件人:Steffen Wohlers <st...@gmx.de>
发送时间:2018年7月22日(星期日) 22:22
收件人:user <us...@flink.apache.org>
主 题:Network PartitionNotFoundException when run on multi nodes

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task Manager.
But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen



Re: Network PartitionNotFoundException when run on multi nodes

Posted by Steffen Wohlers <st...@gmx.de>.
Hi Zhijiang, Minglei, all,

your both hints and explanations work well. Thank you very much!

Thanks,
Steffen

> On 23. Jul 2018, at 08:10, Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
> 
> Hi Steffen,
> 
> This exception indicates that when the downstream task requests partition from the upstream task, the upstream task has not initialized to register its result partition.
> In this case, the downstream task will inquire the state from job manager, and then retry to request partition from the upstream until the maximum retry timeout. You can increase the parameter of "taskmanager.network.request-backoff.max" to check whether it works, the default value is 10s.
> 
> BTW, you should check why the upstream registers its result partition delayed, maybe the upstream TaskManager received the task deployment delayed from JobManager, or some operations in upstream task initialization unexpectly cost more time before registering result partition. 
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Steffen Wohlers <st...@gmx.de>
> 发送时间:2018年7月22日(星期日) 22:22
> 收件人:user <us...@flink.apache.org>
> 主 题:Network PartitionNotFoundException when run on multi nodes
> 
> Hi all,
> 
> I have some problems when running my application on more than one Task Manager.
> 
> setup:
> node1: Job Manager, Task Manager
> node2: Task Manager
> 
> I can run my program successfully on each node alone when I stop the other Task Manager.
> But when I start both and set parallelism = 2, every time I got the following exception (after 30 seconds):
> 
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not found.
> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
> 	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
> 	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> It seems the problem occurs when a subtask is linked to both Task Manager.
> 
> Does anybody know how I can make it work?
> 
> Thanks,
> Steffen
> 
>