You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Wang (JIRA)" <ji...@apache.org> on 2018/05/31 18:10:00 UTC

[jira] [Created] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

Lucas Wang created KAFKA-6974:
---------------------------------

             Summary: Changes the interaction between request handler threads and fetcher threads into an ASYNC model
                 Key: KAFKA-6974
                 URL: https://issues.apache.org/jira/browse/KAFKA-6974
             Project: Kafka
          Issue Type: Improvement
            Reporter: Lucas Wang


Problem Statement:
At LinkedIn, occasionally our clients complain about receiving consant NotLeaderForPartition exceptions 

Investigations:
For one investigated case, the cluster was going through a rolling bounce. And we saw there was a ~8 minutes delay between an old partition leader resigning and the new leader becoming active, based on entries of "Broker xxx handling LeaderAndIsr request" in the state change log.
Our monitoring shows the LeaderAndISR request local time during the incident went up to ~4 minutes.


Explanations:
One possible explanation of the ~8 minutes of delay is:
During controlled shutdown of a broker, the partitions whose leaders lie on the shutting down broker need to go through leadership transitions. And the controller process partitions in batches with each batch having config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 minutes, then the subsequent LeaderAndISR requests can have an accumulated delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that subsequent LeaderAndISR requests are blocked in a muted channel, given only one LeaderAndISR request can be processed at a time with a maxInFlightRequestsPerConnection setting of 1. When that happens, no existing metric would show the total delay of 8 or 12 minutes for muted requests.
Now the question is why it took ~4 minutes for the the 1st LeaderAndISR request to finish.

Explanation for the ~4 minutes of local time for LeaderAndISR request:
During processing of an LeaderAndISR request, the request handler thread needs to add partitions to or remove partitions from partitionStates field of the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the size of the partitionStates field. On the other hand, background fetcher threads need to iterate through all the partitions in partitionStates in order to build fetch request, and process fetch responses. The synchronization between request handler thread and the fetcher threads is done through a partitionMapLock. 
Specifically, the fetcher threads may acquire the partitionMapLock, and then calls the following functions for processing the fetch response
(1) processPartitionData, which in turn calls 
(2) Replica.maybeIncrementLogStartOffset, which calls 
(3) Log.maybeIncrementLogStartOffset, which calls 
(4) LeaderEpochCache.clearAndFlushEarliest.
Now two factors contribute to the long holding of the partitionMapLock,
1. function (4) above entails calling sync() to make sure data gets persistent to the disk, which may potentially have a long latency
2. All the 4 functions above can potentially be called for each partition in the fetch response, multiplying the sync() latency by a factor of n.

The end result is that the request handler thread got blocked for a long time trying to acquire the partitionMapLock of some fetcher inside 
AbstractFetcherManager.shutdownIdleFetcherThreads since checking each fetcher's partitionCount requires getting the partitionMapLock.

In our testing environment, we reproduced the problem and confirmed the explanation above with a request handler thread getting blocked for 10 seconds trying to acquire the partitionMapLock of one particular fetcher thread, while there are many log entries showing "Incrementing log start offset of partition..."

Proposed change:
We propose to change the interaction between the request handler threads and the fetcher threads to an ASYNC model by using an event queue. All requests to add or remove partitions, or shutdown idle fetcher threads are modeled as items in the event queue. And only the fetcher threads can take items out of the event queue and actually process them.
In the new ASYNC model, in order to be able to process an infinite sequence of FetchRequests, a fetcher thread initially has one FetchRequest, and after it's done processing one FetchRequest, it enqueues one more into its own event queue.
Also since the current AbstractFetcherThread logic is inherited by both the replica-fetcher-threads and the consumer-fetcher-threads for the old consumer, and the latter has been deprecated, we plan to implement the ASYNC model with a clean-slate approach, and only support the replica-fetcher-threads, in order to make the code cleaner.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)