You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Von Gosling (Jira)" <ji...@apache.org> on 2022/08/30 12:51:00 UTC

[jira] [Commented] (KAFKA-14189) Improve connection limit and reuse of coordinator and leader in KafkaConsumer

    [ https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597851#comment-17597851 ] 

Von Gosling commented on KAFKA-14189:
-------------------------------------

I'd like to hear some suggestions from [~junrao]. Do we have the possibility to reuse the same connection in such conditions?

> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14189
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0
>            Reporter: Junyang Liu
>            Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is Integer.MAX_VALUE - coordinator id, which is different with connection id of partition leader. So the connection cannot be reused when coordinator and leader are in the same broker, which means we need two seperated connections with the same broker. Suppose such case, a consumer has connected to the coordinator and finished Join and Sync, and wants to send FETCH to leader in the same broker. But the connection count has reached limit, so the consumer with be in the group but cannot consume messages
> partial logs:
> {code:java}
> Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node <ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). (org.apache.kafka.clients.FetchSessionHandler)
> Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> Initiating connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> (org.apache.kafka.clients.NetworkClient)
> Using older server API v3 to send OFFSET_COMMIT {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]} with correlation id 242 to node 2147483645 (org.apache.kafka.clients.NetworkClient)
> Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 (org.apache.kafka.common.network.Selector)
> Completed connection to node 2. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
> Initiating API versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
> Subscribed to topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
> Connection with /<ip> disconnected (org.apache.kafka.common.network.Selector)
> Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) {code}
> connection to coordinator, rebalance and fetching offsets have finished. when preparing connection to leader for fetching, the connection limit has reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a combination of multiple connections(connections with coordinator and leader in same broker), not atomic, which may leads to "half connected". I think we can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker
>  # make the connection limit more flexible, such as allowing extra related connections of a consumer when the connection count limit has reached if it has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)