You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "windkithk (JIRA)" <ji...@apache.org> on 2019/05/08 05:50:00 UTC

[jira] [Updated] (KAFKA-8334) Occasional OffsetCommit Timeout

     [ https://issues.apache.org/jira/browse/KAFKA-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

windkithk updated KAFKA-8334:
-----------------------------
    Description: 
h2. 1) Issue Summary
Since we have upgraded to 1.1, we have observed occasional OffsetCommit timeouts from clients
{code:java}
Offset commit failed on partition sometopic-somepartition at offset someoffset: The request timed out{code}

Normally OffsetCommit completes within 10ms but when we check the 99.9 percentile, we see the request duration time jumps up to 5000 ms (offsets.commit.timeout.ms)

Here is a screenshot of prometheus recording kafka_network_request_duration_milliseconds
 !offsetcommit_p99.9_cut.jpg| thumbnail!
and after checking the duration breakdown, most of the time was spent on "Remote" Scope

(Below is a request log line produced by inhouse slow request logger
{code:java}
[2019-04-16 13:06:20,339] WARN Slow response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=953) -- {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:35078-31125;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.request.logger)
{code}

h2. 2) What got changed in 1.1 from 0.10.2.1?

# Log Level Changed
In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from DEBUG to WARN
# Group Lock is acquired when trying to complete DelayedProduce of OffsetCommit
This was added after 0.11.0.2
(Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
(PR: https://github.com/apache/kafka/pull/4103)
(in 1.1 https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
# Followers do incremental fetch
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
{quote}
OffsetCommit append a message of committed offset to partition of topic `__consumer_offsets`

During the append, it would create a DelayedProduce with lock to GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory

When follower fetches the partition of topic `__consumer_offsets` and causes an increase in HighWaterMark, delayedProducePurgatory would be transversed and all operations related to the partition may be completed
{quote}

*DelayedProduce from OffsetCommit may not be completed, if the group metadata lock was held by others*

h2. 4) Reproduce
h4. Methodology
{code}
1. DelayedProduce on __consumer_offsets could not be completed if the group.lock is acquired by others
2. We spam requests like Heartbeat to keep acquiring group.lock
3. We keep sending OffsetCommit and check the processing time
{code}

h4. Reproduce Script
https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
# jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
# tester.py - fetch one message and do a long processing (or sleep) and then commit the offset

h4. Result
||Seq||Operation||Lock||
|1|OffsetCommit Request	
|2|Append to local __consumer_offsets	
|3|DelayedProduce tryComplete	
|4|Added into delayedProducePurgatory	
|5|FetchFollower1 Fetch
|6|FetchFollower2 Fetch	
|7|Heartbeat Request|Acquired group.lock
|8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
|9|Heartbeat Response|Released group.lock
| |(NO FetchFollower Requests on the partitions __consumer_offsets)
|10|OffsetCommit Response (Timeout)

h4. Trace Log
{code}
// The OffsetCommit Request
[2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=2114) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:57032-29505;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
 
// Initial Check of DelayedProduce:tryCompleteElseWatch
// https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
[2019-04-15 23:59:53,736] TRACE Initial partition status for __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
[2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
[2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
 
// Follower fetching the new message in __consumer_offsets-48, and with Heartbeat in between
// DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when it cannot obtain the lock (group.lock)
// https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492703) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=1788883) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.88.229:51084-10;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492704) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492705) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=1788529) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.117.57:39512-9;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
 
// OffsetCommit Timed out
[2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed due to org.apache.kafka.common.errors.TimeoutException (kafka.server.KafkaApis)
{code}

h4. OffsetCommit, FetcherFollower, HeartBeat
When a FetchFollower comes and updates the HighWaterMark, it supposes to complete the DelayedProduce and so the OffsetCommit request.
However, it would only do it when it can obtain the GroupMetaDataLock (retry only once and immediately...)
As a result, the DelayedProduce would only be checked next time when FetchFollower comes (and updates the HighWaterMark).
Which usually means the next OffsetCommit, this explains why we observe OffsetCommit request timed out (high OffsetCommit latency) during low traffic time.


||OffsetCommit||FetcherFollower||HeartBeat||
| replicaManager.appendRecords | |
| → delayedProducePurgatory.tryCompleteElseWatch | |
| → tryComplete() | |
| → watchForOperation() | |
| → operation.maybeTryComplete() | |
| | partition.updateReplicaLogReadResult |
| | → tryCompleteDelayedRequests |
| | → delayedProducePurgatory.checkAndComplete |
| | → watchers.tryCompleteWatched |
| | → operation.maybeTryComplete() | group.lock
| | → group.tryLock |
| | → false |
| | | group.unlock

h2. 5) Solution
We can have a separate executor to later retry completing DelayedOperation which failed to obtain lock

  was:
h2. 1) Issue Summary
Since we have upgraded to 1.1, we have observed occasional OffsetCommit timeouts from clients
{code:java}
Offset commit failed on partition sometopic-somepartition at offset someoffset: The request timed out{code}

Normally OffsetCommit completes within 10ms but when we check the 99.9 percentile, we see the request duration time jumps up to 5000 ms (offsets.commit.timeout.ms)

Here is a screenshot of prometheus recording kafka_network_request_duration_milliseconds
 !offsetcommit_p99.9_cut.jpg!
and after checking the duration breakdown, most of the time was spent on "Remote" Scope

(Below is a request log line produced by inhouse slow request logger
{code:java}
[2019-04-16 13:06:20,339] WARN Slow response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=953) -- {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:35078-31125;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.request.logger)
{code}

h2. 2) What got changed in 1.1 from 0.10.2.1?

# Log Level Changed
In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from DEBUG to WARN
# Group Lock is acquired when trying to complete DelayedProduce of OffsetCommit
This was added after 0.11.0.2
(Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
(PR: https://github.com/apache/kafka/pull/4103)
(in 1.1 https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
# Followers do incremental fetch
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
{quote}
OffsetCommit append a message of committed offset to partition of topic `__consumer_offsets`

During the append, it would create a DelayedProduce with lock to GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory

When follower fetches the partition of topic `__consumer_offsets` and causes an increase in HighWaterMark, delayedProducePurgatory would be transversed and all operations related to the partition may be completed
{quote}

*DelayedProduce from OffsetCommit may not be completed, if the group metadata lock was held by others*

h2. 4) Reproduce
h4. Methodology
{code}
1. DelayedProduce on __consumer_offsets could not be completed if the group.lock is acquired by others
2. We spam requests like Heartbeat to keep acquiring group.lock
3. We keep sending OffsetCommit and check the processing time
{code}

h4. Reproduce Script
https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
# jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
# tester.py - fetch one message and do a long processing (or sleep) and then commit the offset

h4. Result
||Seq||Operation||Lock||
|1|OffsetCommit Request	
|2|Append to local __consumer_offsets	
|3|DelayedProduce tryComplete	
|4|Added into delayedProducePurgatory	
|5|FetchFollower1 Fetch
|6|FetchFollower2 Fetch	
|7|Heartbeat Request|Acquired group.lock
|8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
|9|Heartbeat Response|Released group.lock
| |(NO FetchFollower Requests on the partitions __consumer_offsets)
|10|OffsetCommit Response (Timeout)

h4. Trace Log
{code}
// The OffsetCommit Request
[2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=2114) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:57032-29505;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
 
// Initial Check of DelayedProduce:tryCompleteElseWatch
// https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
[2019-04-15 23:59:53,736] TRACE Initial partition status for __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
[2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
[2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
 
// Follower fetching the new message in __consumer_offsets-48, and with Heartbeat in between
// DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when it cannot obtain the lock (group.lock)
// https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492703) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=1788883) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.88.229:51084-10;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492704) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492705) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=1788529) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.117.57:39512-9;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
[2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
 
// OffsetCommit Timed out
[2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed due to org.apache.kafka.common.errors.TimeoutException (kafka.server.KafkaApis)
{code}

h4. OffsetCommit, FetcherFollower, HeartBeat
When a FetchFollower comes and updates the HighWaterMark, it supposes to complete the DelayedProduce and so the OffsetCommit request.
However, it would only do it when it can obtain the GroupMetaDataLock (retry only once and immediately...)
As a result, the DelayedProduce would only be checked next time when FetchFollower comes (and updates the HighWaterMark).
Which usually means the next OffsetCommit, this explains why we observe OffsetCommit request timed out (high OffsetCommit latency) during low traffic time.


||OffsetCommit||FetcherFollower||HeartBeat||
| replicaManager.appendRecords | |
| → delayedProducePurgatory.tryCompleteElseWatch | |
| → tryComplete() | |
| → watchForOperation() | |
| → operation.maybeTryComplete() | |
| | partition.updateReplicaLogReadResult |
| | → tryCompleteDelayedRequests |
| | → delayedProducePurgatory.checkAndComplete |
| | → watchers.tryCompleteWatched |
| | → operation.maybeTryComplete() | group.lock
| | → group.tryLock |
| | → false |
| | | group.unlock

h2. 5) Solution
We can have a separate executor to later retry completing DelayedOperation which failed to obtain lock


> Occasional OffsetCommit Timeout
> -------------------------------
>
>                 Key: KAFKA-8334
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8334
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1
>            Reporter: windkithk
>            Priority: Major
>         Attachments: offsetcommit_p99.9_cut.jpg
>
>
> h2. 1) Issue Summary
> Since we have upgraded to 1.1, we have observed occasional OffsetCommit timeouts from clients
> {code:java}
> Offset commit failed on partition sometopic-somepartition at offset someoffset: The request timed out{code}
> Normally OffsetCommit completes within 10ms but when we check the 99.9 percentile, we see the request duration time jumps up to 5000 ms (offsets.commit.timeout.ms)
> Here is a screenshot of prometheus recording kafka_network_request_duration_milliseconds
>  !offsetcommit_p99.9_cut.jpg| thumbnail!
> and after checking the duration breakdown, most of the time was spent on "Remote" Scope
> (Below is a request log line produced by inhouse slow request logger
> {code:java}
> [2019-04-16 13:06:20,339] WARN Slow response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=953) -- {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:35078-31125;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.request.logger)
> {code}
> h2. 2) What got changed in 1.1 from 0.10.2.1?
> # Log Level Changed
> In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from DEBUG to WARN
> # Group Lock is acquired when trying to complete DelayedProduce of OffsetCommit
> This was added after 0.11.0.2
> (Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
> (PR: https://github.com/apache/kafka/pull/4103)
> (in 1.1 https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
> # Followers do incremental fetch
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
> {quote}
> OffsetCommit append a message of committed offset to partition of topic `__consumer_offsets`
> During the append, it would create a DelayedProduce with lock to GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory
> When follower fetches the partition of topic `__consumer_offsets` and causes an increase in HighWaterMark, delayedProducePurgatory would be transversed and all operations related to the partition may be completed
> {quote}
> *DelayedProduce from OffsetCommit may not be completed, if the group metadata lock was held by others*
> h2. 4) Reproduce
> h4. Methodology
> {code}
> 1. DelayedProduce on __consumer_offsets could not be completed if the group.lock is acquired by others
> 2. We spam requests like Heartbeat to keep acquiring group.lock
> 3. We keep sending OffsetCommit and check the processing time
> {code}
> h4. Reproduce Script
> https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
> # jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
> # tester.py - fetch one message and do a long processing (or sleep) and then commit the offset
> h4. Result
> ||Seq||Operation||Lock||
> |1|OffsetCommit Request	
> |2|Append to local __consumer_offsets	
> |3|DelayedProduce tryComplete	
> |4|Added into delayedProducePurgatory	
> |5|FetchFollower1 Fetch
> |6|FetchFollower2 Fetch	
> |7|Heartbeat Request|Acquired group.lock
> |8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
> |9|Heartbeat Response|Released group.lock
> | |(NO FetchFollower Requests on the partitions __consumer_offsets)
> |10|OffsetCommit Response (Timeout)
> h4. Trace Log
> {code}
> // The OffsetCommit Request
> [2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=kafka-python-1.4.6, correlationId=2114) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]} from connection 10.127.114.124:20992-10.127.144.181:57032-29505;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
>  
> // Initial Check of DelayedProduce:tryCompleteElseWatch
> // https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
> [2019-04-15 23:59:53,736] TRACE Initial partition status for __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for __consumer_offsets-48, current status [acksPending: true, error: 7, startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
>  
> // Follower fetching the new message in __consumer_offsets-48, and with Heartbeat in between
> // DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when it cannot obtain the lock (group.lock)
> // https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492703) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=1788883) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.88.229:51084-10;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492704) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, clientId=kafka-python-1.4.6, correlationId=1492705) -- {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} from connection 10.127.114.124:20992-10.127.144.181:57028-29504;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=1788529) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} from connection 10.127.114.124:20992-10.127.117.57:39512-9;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
>  
> // OffsetCommit Timed out
> [2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed due to org.apache.kafka.common.errors.TimeoutException (kafka.server.KafkaApis)
> {code}
> h4. OffsetCommit, FetcherFollower, HeartBeat
> When a FetchFollower comes and updates the HighWaterMark, it supposes to complete the DelayedProduce and so the OffsetCommit request.
> However, it would only do it when it can obtain the GroupMetaDataLock (retry only once and immediately...)
> As a result, the DelayedProduce would only be checked next time when FetchFollower comes (and updates the HighWaterMark).
> Which usually means the next OffsetCommit, this explains why we observe OffsetCommit request timed out (high OffsetCommit latency) during low traffic time.
> ||OffsetCommit||FetcherFollower||HeartBeat||
> | replicaManager.appendRecords | |
> | → delayedProducePurgatory.tryCompleteElseWatch | |
> | → tryComplete() | |
> | → watchForOperation() | |
> | → operation.maybeTryComplete() | |
> | | partition.updateReplicaLogReadResult |
> | | → tryCompleteDelayedRequests |
> | | → delayedProducePurgatory.checkAndComplete |
> | | → watchers.tryCompleteWatched |
> | | → operation.maybeTryComplete() | group.lock
> | | → group.tryLock |
> | | → false |
> | | | group.unlock
> h2. 5) Solution
> We can have a separate executor to later retry completing DelayedOperation which failed to obtain lock



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)