You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by tombentley <gi...@git.apache.org> on 2017/09/13 15:59:40 UTC

[GitHub] kafka pull request #3848: KAFKA-5692: Change PreferredReplicaLeaderElectionC...

GitHub user tombentley opened a pull request:

    https://github.com/apache/kafka/pull/3848

    KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin…

    …Client
    
    See also KIP-183.
    
    The contribution is my original work and I license the work to the project under the project's open source license.
    
    This implements the following algorithm:
    
    1. AdminClient sends ElectPreferredLeadersRequest.
    2. KafakApis receives ElectPreferredLeadersRequest and delegates to
       ReplicaManager.electPreferredLeaders()
    3. ReplicaManager delegates to KafkaController.electPreferredLeaders()
    4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
    5. ReplicaManager.electPreferredLeaders()'s callback uses the
       delayedElectPreferredReplicasPurgatory to wait for the results of the
       election to appear in the metadata cache. If there are no results
       because of errors, or because the preferred leaders are already leading
       the partitions then a response is returned immediately.
    
    In the EventManager work thread the preferred leader is elected as follows:
    
    1. The EventManager runs PreferredReplicaLeaderElection.process()
    2. process() calls KafkaController.onPreferredReplicaElectionWithResults()
    3. KafkaController.onPreferredReplicaElectionWithResults()
       calls the PartitionStateMachine.handleStateChangesWithResults() to
       perform the election (asynchronously the PSM will send LeaderAndIsrRequest
       to the new and old leaders and UpdateMetadataRequest to all brokers)
       then invokes the callback.
    
    Note: the change in parameter type for CollectionUtils.groupDataByTopic().
    This makes sense because the AdminClient APIs use Collection consistently,
    rather than List or Set. If binary compatiblity is a consideration the old
    version should be kept, delegating to the new version.
    
    I had to add PartitionStateMachine.handleStateChangesWithResults()
    in order to be able to process a set of state changes in the
    PartitionStateMachine *and get back individual results*.
    At the same time I noticed that all callers of existing handleStateChange()
    were destructuring a TopicAndPartition that they already had in order
    to call handleStateChange(), and that handleStateChange() immediately
    instantiated a new TopicAndPartition. Since TopicAndPartition is immutable
    this is pointless, so I refactored it. handleStateChange() also now returns
    any exception it caught, which is necessary for handleStateChangesWithResults()

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tombentley/kafka KAFKA-5692-elect-preferred

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3848.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3848
    
----
commit 6b9bf178049e1eedfb5f07771cc3c595c02484d9
Author: Tom Bentley <tb...@redhat.com>
Date:   2017-09-06T14:39:24Z

    KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use AdminClient
    
    See also KIP-183.
    
    This implements the following algorithm:
    
    1. AdminClient sends ElectPreferredLeadersRequest.
    2. KafakApis receives ElectPreferredLeadersRequest and delegates to
       ReplicaManager.electPreferredLeaders()
    3. ReplicaManager delegates to KafkaController.electPreferredLeaders()
    4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
    5. ReplicaManager.electPreferredLeaders()'s callback uses the
       delayedElectPreferredReplicasPurgatory to wait for the results of the
       election to appear in the metadata cache. If there are no results
       because of errors, or because the preferred leaders are already leading
       the partitions then a response is returned immediately.
    
    In the EventManager work thread the preferred leader is elected as follows:
    
    1. The EventManager runs PreferredReplicaLeaderElection.process()
    2. process() calls KafkaController.onPreferredReplicaElectionWithResults()
    3. KafkaController.onPreferredReplicaElectionWithResults()
       calls the PartitionStateMachine.handleStateChangesWithResults() to
       perform the election (asynchronously the PSM will send LeaderAndIsrRequest
       to the new and old leaders and UpdateMetadataRequest to all brokers)
       then invokes the callback.
    
    Note: the change in parameter type for CollectionUtils.groupDataByTopic().
    This makes sense because the AdminClient APIs use Collection consistently,
    rather than List or Set. If binary compatiblity is a consideration the old
    version should be kept, delegating to the new version.
    
    I had to add PartitionStateMachine.handleStateChangesWithResults()
    in order to be able to process a set of state changes in the
    PartitionStateMachine *and get back individual results*.
    At the same time I noticed that all callers of existing handleStateChange()
    were destructuring a TopicAndPartition that they already had in order
    to call handleStateChange(), and that handleStateChange() immediately
    instantiated a new TopicAndPartition. Since TopicAndPartition is immutable
    this is pointless, so I refactored it. handleStateChange() also now returns
    any exception it caught, which is necessary for handleStateChangesWithResults()

----


---