You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/30 05:09:44 UTC

[kafka] branch trunk updated: KAFKA-6617; Improve controller performance by batching reassignment znode write operation

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d99f4a0  KAFKA-6617; Improve controller performance by batching reassignment znode write operation
d99f4a0 is described below

commit d99f4a0ffa65ae3a654b043547f114a16fbe4905
Author: Dong Lin <do...@linkedin.com>
AuthorDate: Tue May 29 22:09:21 2018 -0700

    KAFKA-6617; Improve controller performance by batching reassignment znode write operation
    
    KafkaController currently writes reassignment znode once for every partition that has been successfully reassigned. This is unnecessary and controller should be able to update reassignment znode once to remove all partitions that have been reassigned from the reassignment znode.
    
    Author: Dong Lin <do...@linkedin.com>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
    
    Closes #4659 from lindong28/KAFKA-6617
---
 .../scala/kafka/controller/KafkaController.scala   | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0b50e34..a1d14e6 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -545,7 +545,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       //10. Update AR in ZK with RAR.
       updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
       //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
-      removePartitionFromReassignedPartitions(topicPartition)
+      removePartitionsFromReassignedPartitions(Set(topicPartition))
       //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
@@ -565,10 +565,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
    */
   private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
+    val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
     topicPartitions.foreach { tp =>
       if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
         error(s"Skipping reassignment of $tp since the topic is currently being deleted")
-        removePartitionFromReassignedPartitions(tp)
+        partitionsToBeRemovedFromReassignment.add(tp)
       } else {
         val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
           throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
@@ -581,7 +582,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           if (assignedReplicas == newReplicas) {
             info(s"Partition $tp to be reassigned is already assigned to replicas " +
               s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
-            removePartitionFromReassignedPartitions(tp)
+            partitionsToBeRemovedFromReassignment.add(tp)
           } else {
             try {
               info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
@@ -594,15 +595,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
               case e: Throwable =>
                 error(s"Error completing reassignment of partition $tp", e)
                 // remove the partition from the admin path to unblock the admin client
-                removePartitionFromReassignedPartitions(tp)
+                partitionsToBeRemovedFromReassignment.add(tp)
             }
           }
         } else {
             error(s"Ignoring request to reassign partition $tp that doesn't exist.")
-            removePartitionFromReassignedPartitions(tp)
+            partitionsToBeRemovedFromReassignment.add(tp)
         }
       }
     }
+    removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
   }
 
   private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
@@ -853,14 +855,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * `ControllerContext.partitionsBeingReassigned` must be populated with all partitions being reassigned before this
    * method is invoked to avoid premature deletion of the `reassign_partitions` znode.
    */
-  private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
-    controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
+  private def removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]) {
+    partitionsToBeRemoved.map(controllerContext.partitionsBeingReassigned).foreach { reassignContext =>
       reassignContext.unregisterReassignIsrChangeHandler(zkClient)
     }
 
-    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition
+    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned -- partitionsToBeRemoved
 
-    info(s"Removing partition $topicPartition from the list of reassigned partitions in zookeeper")
+    info(s"Removing partitions $partitionsToBeRemoved from the list of reassigned partitions in zookeeper")
 
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
@@ -876,7 +878,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       }
     }
 
-    controllerContext.partitionsBeingReassigned.remove(topicPartition)
+    controllerContext.partitionsBeingReassigned --= partitionsToBeRemoved
   }
 
   private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.