You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huameng Li (Jira)" <ji...@apache.org> on 2022/06/21 14:29:00 UTC

[jira] [Comment Edited] (FLINK-28115) Flink 1.15.0 Parallelism Rebalance causes flink job failure

    [ https://issues.apache.org/jira/browse/FLINK-28115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556929#comment-17556929 ] 

Huameng Li edited comment on FLINK-28115 at 6/21/22 2:28 PM:
-------------------------------------------------------------

Hi Jing and Qingsheng, thanks for looking into this parallelism re-balance issue issue.

The exception stack trace logging is from the task manager (TM).

This issue exists in only Flink 1.15.0, and occurs right after the job submission, and where the # of parallelism of map/process function is different from that of sink or source.

The issue is not related to Flink Kafka connector. The TaskManager is still alive when the issue occurs.

*Job topology:*

*kafkaSource (parallelism 4) -> map/processer (parallelism 8) -> kafkaSink (4 parallelism)*

*Our dev flink cluster has 8 hosts,* *each host has 25 task managers alive.* *Each TM has 2 task slots*

Interestingly, if a job uses same number of parallelism (No rebalance)  for *source* (parallelism {*}6{*}) -> *map* (parallelism {*}6{*}) -> *sink* (parallelism {*}6{*}), the job runs fine with no issue. 


was (Author: JIRAUSER291148):
Hi Jing and Qingsheng, thanks for looking into this parallelism re-balance issue issue.

The exception stack trace logging is from the task manager (TM).

This issue exists in only Flink 1.15.0, and occurs right after the job submission, and where the # of parallelism of map/process function is different from that of sink or source.

The issue is not related to Flink Kafka connector. The TaskManager is still alive when the issue occurs.

 If we use same number of parallelism (No rebalance)  for *source* (parallelism {*}4{*}) -> *map* (parallelism {*}4{*}) -> *sink* (parallelism {*}4{*}), the job runs fine without issue. 

> Flink 1.15.0 Parallelism Rebalance causes flink job failure
> -----------------------------------------------------------
>
>                 Key: FLINK-28115
>                 URL: https://issues.apache.org/jira/browse/FLINK-28115
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.0
>         Environment: Flink 1.15.0 session cluster with 8 hosts.
>            Reporter: Huameng Li
>            Priority: Major
>         Attachments: image-2022-06-17-13-01-08-992.png
>
>
> {color:#de350b}*Issue:*{color}
> *Flink 1.15.0 Parallelism Rebalance causes flink job failure.* Same issue was not in flink 1.14.4.
> {color:#de350b}*Exceptions:*{color}
> *1 of the 8 re-balance parallelism task slots failed due to* 
> *{color:#de350b}org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:43354{color}*
> *{color:#de350b}Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused{color}*
>  
> *Job topology:*
> *kafkaSource (parallelism 4) -> map/processer (parallelism 8) -> kafkaSink (4 parallelism)*
> *Our dev flink cluster has 8 hosts,* *each host has 25 task managers alive.* *Each TM has 2 task slots*
> *!image-2022-06-17-13-01-08-992.png!*
>  
>  
> *Error stack trace:*
> 2022-06-17 12:54:38.563 WARN  [Framework] [Map (3/8)#5|#5] org.apache.flink.runtime.taskmanager.Task  - Map (3/8)#5 (69a82f741d68fd7161d7b13de48c6c4b) switched from RUNNING to FAILED with failure cause: org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition 2064424258b3b74fdc349607017f1029#1@a94744160d7d5e85101881e2d783dcd2 not reachable.
>     at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:190)
>     at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:342)
>     at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:312)
>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:115)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connecting to remote task manager '/127.0.0.1:43354' has failed. This might indicate that the remote task manager has been lost.
>     at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:169)
>     at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:135)
>     at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:96)
>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:95)
>     at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:186)
>     ... 15 more
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:43354
> Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
>     at org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155)
>     at org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128)
>     at org.apache.flink.shaded.netty4.io.netty.channel.unix.Socket.finishConnect(Socket.java:320)
>     at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710)
>     at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687)
>     at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567)
>     at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470)
>     at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>     at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)