You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "chenchao (Jira)" <ji...@apache.org> on 2021/09/01 07:21:00 UTC

[jira] [Commented] (KAFKA-8334) Occasional OffsetCommit Timeout

    [ https://issues.apache.org/jira/browse/KAFKA-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407907#comment-17407907 ] 

chenchao commented on KAFKA-8334:
---------------------------------

Excuse me, this problem also occurs in my online environment, but the online environment version cannot be upgraded.

Can I solve this problem by upgrading the client consumer to 2.7.0 , and my kafka server is 2.3.0

  [~junrao]

> Occasional OffsetCommit Timeout
> -------------------------------
>
>                 Key: KAFKA-8334
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8334
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0
>            Reporter: windkithk
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>             Fix For: 2.7.0
>
>         Attachments: kafka8334.patch, offsetcommit_p99.9_cut.jpg
>
>
> h2. 1) Issue Summary
> Since we have upgraded to 1.1, we have observed occasional OffsetCommit timeouts from clients
> {code:java}
> Offset commit failed on partition sometopic-somepartition at offset someoffset: The request timed out{code}
> Normally OffsetCommit completes within 10ms but when we check the 99.9 percentile, we see the request duration time jumps up to 5000 ms (offsets.commit.timeout.ms)
> Here is a screenshot of prometheus recording kafka_network_request_duration_milliseconds
> (offsetcommit_p99.9_cut.jpg)
> and after checking the duration breakdown, most of the time was spent on "Remote" Scope
> (Below is a request log line produced by inhouse slow request logger
> {code:java}
> [2019-04-16 13:06:20,339] WARN Slow response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=953) -- {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]} from connection;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.request.logger)
> {code}
> h2. 2) What got changed in 1.1 from 0.10.2.1?
> # Log Level Changed
> In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from DEBUG to WARN
> # Group Lock is acquired when trying to complete DelayedProduce of OffsetCommit
> This was added after 0.11.0.2
> (Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
> (PR: https://github.com/apache/kafka/pull/4103)
> (in 1.1 https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
> # Followers do incremental fetch
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
> {quote}
> OffsetCommit append a message of committed offset to partition of topic `__consumer_offsets`
> During the append, it would create a DelayedProduce with lock to GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory
> When follower fetches the partition of topic `__consumer_offsets` and causes an increase in HighWaterMark, delayedProducePurgatory would be transversed and all operations related to the partition may be completed
> {quote}
> *DelayedProduce from OffsetCommit may not be completed, if the group metadata lock was held by others*
> h2. 4) Reproduce
> h4. Methodology
> {code}
> 1. DelayedProduce on __consumer_offsets could not be completed if the group.lock is acquired by others
> 2. We spam requests like Heartbeat to keep acquiring group.lock
> 3. We keep sending OffsetCommit and check the processing time
> {code}
> h4. Reproduce Script
> https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
> # jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
> # tester.py - fetch one message and do a long processing (or sleep) and then commit the offset
> h4. Result
> ||Seq||Operation||Lock||
> |1|OffsetCommit Request	
> |2|Append to local __consumer_offsets	
> |3|DelayedProduce tryComplete	
> |4|Added into delayedProducePurgatory	
> |5|FetchFollower1 Fetch
> |6|FetchFollower2 Fetch	
> |7|Heartbeat Request|Acquired group.lock
> |8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
> |9|Heartbeat Response|Released group.lock
> | |(NO FetchFollower Requests on the partitions __consumer_offsets)
> |10|OffsetCommit Response (Timeout)
> h4. Trace Log
> {code}
> // The OffsetCommit Request
> [2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=2114) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
>  
> // Initial Check of DelayedProduce:tryCompleteElseWatch
> // https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
> [2019-04-15 23:59:53,736] TRACE Initial partition status for __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
>  
> // Follower fetching the new message in __consumer_offsets-48, and with Heartbeat in between
> // DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when it cannot obtain the lock (group.lock)
> // https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492703) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=1788883) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492704) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492705) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=1788529) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
>  
> // OffsetCommit Timed out
> [2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed due to org.apache.kafka.common.errors.TimeoutException (kafka.server.KafkaApis)
> {code}
> h4. OffsetCommit, FetcherFollower, HeartBeat
> When a FetchFollower comes and updates the HighWaterMark, it supposes to complete the DelayedProduce and so the OffsetCommit request.
> However, it would only do it when it can obtain the GroupMetaDataLock (retry only once and immediately...)
> As a result, the DelayedProduce would only be checked next time when FetchFollower comes (and updates the HighWaterMark).
> Which usually means the next OffsetCommit, this explains why we observe OffsetCommit request timed out (high OffsetCommit latency) during low traffic time.
> ||OffsetCommit||FetcherFollower||HeartBeat||
> | replicaManager.appendRecords | |
> | → delayedProducePurgatory.tryCompleteElseWatch | |
> | → tryComplete() | |
> | → watchForOperation() | |
> | → operation.maybeTryComplete() | |
> | | partition.updateReplicaLogReadResult |
> | | → tryCompleteDelayedRequests |
> | | → delayedProducePurgatory.checkAndComplete |
> | | → watchers.tryCompleteWatched |
> | | → operation.maybeTryComplete() | group.lock
> | | → group.tryLock |
> | | → false |
> | | | group.unlock
> h2. 5) Solution
> We can have a separate executor to later retry completing DelayedOperation which failed to obtain lock



--
This message was sent by Atlassian Jira
(v8.3.4#803005)