You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuan Mei (Jira)" <ji...@apache.org> on 2020/12/01 07:56:00 UTC

[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

    [ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241327#comment-17241327 ] 

Yuan Mei commented on FLINK-19249:
----------------------------------

The problem has been thoroughly explained in both FLINK-16030 and this ticket. Summarized in short, when the network environment is unstable, downstream TMs sometimes may not be able to respond to such errors until the TCP-keepalive probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to downstream and 2). simply releases sub-partition view resources but nothing else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable environment; and TCP-keepalive can not be too short without unwanted side effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, then the question is how to detect and what to do after detection. Put some ideas here for discussion and also in mind that this happens rarely.

----------------

*1. Where to detect the error?*

When exception caught; for example when failing to send the data. It is a better place than `ChannelInactive`, since ChannelInactive may be caused by different reasons.  As long as upstream fails to send data, the job loses data since we do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent network?*

This won't work unless we have retry logic from upstream; But `retry` also means waiting for responding, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are shared: different jobs do not share tcp-connections (please correct me if I am wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; Conceptually this is a TM-level error; there is not and probably should not be a direct hook up between netty -> task (it is still doable through ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the exception); JM decides what to react/fail (in this case, JM need to reboot both the upstream and downstream TMs); and as a result failover all jobs containing in both upstream and downstream TMs.

 

> Detect broken connections in case TCP Timeout takes too long.
> -------------------------------------------------------------
>
>                 Key: FLINK-19249
>                 URL: https://issues.apache.org/jira/browse/FLINK-19249
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Congxian Qiu
>            Assignee: Yuan Mei
>            Priority: Major
>
> {quote}encountered this error on 1.7, after going through the master code, I think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the server and the client may be disconnected innocently. After the disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection because of {{channelInactive}} later(~10 min). between such time, the job can still process data, but the broken channel can't transfer any data or event, so snapshot would fail during this time. this will cause the job to replay many data after failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)