You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "fanrui (Jira)" <ji...@apache.org> on 2021/12/22 11:20:00 UTC

[jira] [Created] (FLINK-25417) Too many connections for TM

fanrui created FLINK-25417:
------------------------------

             Summary: Too many connections for TM
                 Key: FLINK-25417
                 URL: https://issues.apache.org/jira/browse/FLINK-25417
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Network
    Affects Versions: 1.14.2, 1.13.5, 1.15.0
            Reporter: fanrui
         Attachments: image-2021-12-22-19-17-59-486.png, image-2021-12-22-19-18-23-138.png

Hi masters, when the number of task exceeds 10, some TM has more than 4000 TCP connections.

!image-2021-12-22-19-17-59-486.png|width=1388,height=307!

 
h2. Reason:

When the task is initialized, the downstream InputChannel will connect to the upstream ResultPartition.

In PartitionRequestClientFactory#createPartitionRequestClient, there is a clients({_}ConcurrentMap<ConnectionID, CompletableFuture{_}{_}<NettyPartitionRequestClient>{_}{_}> clients{_}). It's a cache to avoid repeated tcp connections. But the ConnectionID has a field is connectionIndex.

The connectionIndex comes from IntermediateResult, which is a random number. When multiple Tasks are running in a TM, other TMs need to establish multiple connections to this TM, and each Task has a NettyPartitionRequestClient.

Assume that the parallelism of the flink job is 100, each TM has 20 Tasks, and the Partition strategy between tasks is rebalance or hash. Then the number of connections for a single TM is (20-1) * 100 * 2 = 3800. If multiple such TMs are running on a single node, there is a risk.

 

I want to know whether it is risky to change the cache key to connectionID.address? That is: a tcp connection is shared between all Tasks of TM. 

I guess it is feasible because:
 # I have tested it and the task can run normally.

 # The Message contains the InputChannelID, which is used to distinguish which channel the NettyMessage belongs to.

 

!image-2021-12-22-19-18-23-138.png|width=2953,height=686!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)