You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/30 05:12:00 UTC

[jira] [Commented] (KAFKA-6617) Improve controller performance by batching reassignment znode write operation

    [ https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494694#comment-16494694 ] 

ASF GitHub Bot commented on KAFKA-6617:
---------------------------------------

lindong28 closed pull request #4659: KAFKA-6617; Improve controller performance by batching reassignment znode write operation
URL: https://github.com/apache/kafka/pull/4659
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0b50e345518..a1d14e649da 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -545,7 +545,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       //10. Update AR in ZK with RAR.
       updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
       //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
-      removePartitionFromReassignedPartitions(topicPartition)
+      removePartitionsFromReassignedPartitions(Set(topicPartition))
       //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
@@ -565,10 +565,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
    */
   private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
+    val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
     topicPartitions.foreach { tp =>
       if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
         error(s"Skipping reassignment of $tp since the topic is currently being deleted")
-        removePartitionFromReassignedPartitions(tp)
+        partitionsToBeRemovedFromReassignment.add(tp)
       } else {
         val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
           throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
@@ -581,7 +582,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
           if (assignedReplicas == newReplicas) {
             info(s"Partition $tp to be reassigned is already assigned to replicas " +
               s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
-            removePartitionFromReassignedPartitions(tp)
+            partitionsToBeRemovedFromReassignment.add(tp)
           } else {
             try {
               info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
@@ -594,15 +595,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
               case e: Throwable =>
                 error(s"Error completing reassignment of partition $tp", e)
                 // remove the partition from the admin path to unblock the admin client
-                removePartitionFromReassignedPartitions(tp)
+                partitionsToBeRemovedFromReassignment.add(tp)
             }
           }
         } else {
             error(s"Ignoring request to reassign partition $tp that doesn't exist.")
-            removePartitionFromReassignedPartitions(tp)
+            partitionsToBeRemovedFromReassignment.add(tp)
         }
       }
     }
+    removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
   }
 
   private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
@@ -853,14 +855,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * `ControllerContext.partitionsBeingReassigned` must be populated with all partitions being reassigned before this
    * method is invoked to avoid premature deletion of the `reassign_partitions` znode.
    */
-  private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
-    controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
+  private def removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]) {
+    partitionsToBeRemoved.map(controllerContext.partitionsBeingReassigned).foreach { reassignContext =>
       reassignContext.unregisterReassignIsrChangeHandler(zkClient)
     }
 
-    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned - topicPartition
+    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned -- partitionsToBeRemoved
 
-    info(s"Removing partition $topicPartition from the list of reassigned partitions in zookeeper")
+    info(s"Removing partitions $partitionsToBeRemoved from the list of reassigned partitions in zookeeper")
 
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
@@ -876,7 +878,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       }
     }
 
-    controllerContext.partitionsBeingReassigned.remove(topicPartition)
+    controllerContext.partitionsBeingReassigned --= partitionsToBeRemoved
   }
 
   private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve controller performance by batching reassignment znode write operation
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6617
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6617
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)