You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2022/09/08 23:16:00 UTC

[jira] [Commented] (KAFKA-14189) Improve connection limit and reuse of coordinator and leader in KafkaConsumer

    [ https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602037#comment-17602037 ] 

Guozhang Wang commented on KAFKA-14189:
---------------------------------------

Hi [~aglicacha] [~vongosling]

The main motivation for using two connection sockets for the coordinator and partition leader is to not block coordination related requests such as join/sync by fetching requests (which could be long polling, and during that time we cannot send other requests using the same socket). Reusing the connection may cause issues e.g. a heartbeat request not being processed in time if there's already fetching request parked at the broker side.

> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14189
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0
>            Reporter: Junyang Liu
>            Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is Integer.MAX_VALUE - coordinator id, which is different with connection id of partition leader. So the connection cannot be reused when coordinator and leader are in the same broker, which means we need two seperated connections with the same broker. Suppose such case, a consumer has connected to the coordinator and finished Join and Sync, and wants to send FETCH to leader in the same broker. But the connection count has reached limit, so the consumer with be in the group but cannot consume messages
> partial logs:
> {code:java}
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node <ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). (org.apache.kafka.clients.FetchSessionHandler) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server API v3 to send OFFSET_COMMIT {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]} with correlation id 242 to node 2147483645 (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed connection to node 2. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with /<ip> disconnected (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 disconnected. (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, correlationId=241) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException. (org.apache.kafka.clients.FetchSessionHandler){code}
> connection to coordinator, rebalance and fetching offsets have finished. when preparing connection to leader for fetching, the connection limit has reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a combination of multiple connections(connections with coordinator and leader in same broker), not atomic, which may leads to "half connected". I think we can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker, to avoid such "half connection" in KafkaConsumer when connection count limit reached
>  # make the connection limit more flexible, such as allowing extra related connections of a consumer when the connection count limit has reached if it has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)