You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/19 19:13:56 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10346: KAFKA-12493: The controller should handle the consistency between the controllerContext and the partition replicas assignment on zookeeper

junrao commented on a change in pull request #10346:
URL: https://github.com/apache/kafka/pull/10346#discussion_r597904290



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]

Review comment:
       processPartitionModifications() is now becoming a bit more complicated. Could we add a comment on what it does?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>

Review comment:
       newPartitionsToBeAdded seems verbose since new and added are describing the same thing. So, it could just be partitionsToBeAdded. Similarly, oldPartitionsToBeModified could just be partitionsToBeModified.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+        val existingPartitionsInZk = zkClient.getChildren(TopicPartitionsZNode.path(topic))
+        val existingPartitionReplicaAssignment = partitionReplicaAssignment
+          .filter(p => existingPartitionsInZk.contains(p._1.partition.toString))

Review comment:
       Could we use `case` to avoid unnamed reference `._1`?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+        val existingPartitionsInZk = zkClient.getChildren(TopicPartitionsZNode.path(topic))
+        val existingPartitionReplicaAssignment = partitionReplicaAssignment
+          .filter(p => existingPartitionsInZk.contains(p._1.partition.toString))
+          .map { case (tp, _) =>
+            tp -> controllerContext.partitionFullReplicaAssignment(tp)
+          }.toMap
 
-        restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+        restorePartitionReplicaAssignment(topic, existingPartitionReplicaAssignment)
       } else {
         // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
         info("Ignoring partition change during topic deletion as no new partitions are added")
       }
-    } else if (partitionsToBeAdded.nonEmpty) {
-      info(s"New partitions to be added $partitionsToBeAdded")
-      partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
+    } else if (oldPartitionsToBeModified.nonEmpty) {
+      warn("Skipping modifying existing partitions %s for topic %s, will restore their replica assignment by cache in controllerContext"

Review comment:
       Perhaps it's clearer to say sth like "Existing partition assignment modified unexpectedly. Restore ..."?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+        val existingPartitionsInZk = zkClient.getChildren(TopicPartitionsZNode.path(topic))

Review comment:
       We already figured out which partitions are existing before. Do we still need to read from ZK?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))

Review comment:
       Hmm, it doesn't seem that partitionReplicaAssignment is iterated in partition order? So, I am not sure dropWhile does what's expected.
   
   Also, could we use `case` to avoid unnamed reference `._1`?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+
+        val existingPartitionsInZk = zkClient.getChildren(TopicPartitionsZNode.path(topic))
+        val existingPartitionReplicaAssignment = partitionReplicaAssignment
+          .filter(p => existingPartitionsInZk.contains(p._1.partition.toString))
+          .map { case (tp, _) =>
+            tp -> controllerContext.partitionFullReplicaAssignment(tp)
+          }.toMap
 
-        restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+        restorePartitionReplicaAssignment(topic, existingPartitionReplicaAssignment)
       } else {
         // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
         info("Ignoring partition change during topic deletion as no new partitions are added")
       }
-    } else if (partitionsToBeAdded.nonEmpty) {
-      info(s"New partitions to be added $partitionsToBeAdded")
-      partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
+    } else if (oldPartitionsToBeModified.nonEmpty) {
+      warn("Skipping modifying existing partitions %s for topic %s, will restore their replica assignment by cache in controllerContext"
+        .format(oldPartitionsToBeModified.map(_._1.partition).mkString(","), topic))
+      val restoreOldPartitionReplicaAssignment = partitionReplicaAssignment.map { case (topicPartition: TopicPartition, assignedReplicas: ReplicaAssignment) =>
+        if (oldPartitionsToBeModified.contains(topicPartition)) {
+          (topicPartition, ReplicaAssignment(controllerContext.partitionReplicaAssignment(topicPartition), Seq.empty, Seq.empty))
+        } else {
+          (topicPartition, assignedReplicas)
+        }
+      }
+      restorePartitionReplicaAssignment(topic, restoreOldPartitionReplicaAssignment)
+    } else if (newPartitionsToBeAdded.nonEmpty) {

Review comment:
       It's possible that both oldPartitionsToBeModified and newPartitionsToBeAdded are nonEmpty and we need to handle that case.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1680,45 +1680,61 @@ class KafkaController(val config: KafkaConfig,
   private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(
       topic: String,
-      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
+      restorePartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
     ): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
-      val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
-      val existingPartitionReplicaAssignment = newPartitionReplicaAssignment
-        .filter(p => existingPartitions.contains(p._1.partition.toString))
-        .map { case (tp, _) =>
-          tp -> controllerContext.partitionFullReplicaAssignment(tp)
-      }.toMap
-
       zkClient.setTopicAssignment(topic,
         controllerContext.topicIds.get(topic),
-        existingPartitionReplicaAssignment,
+        restorePartitionReplicaAssignment,
         controllerContext.epochZkVersion)
     }
 
     if (!isActive) return
     val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
-    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+    val newPartitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
       controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
     }
 
+    val existingPartitionsInContext = partitionReplicaAssignment.dropWhile(partitionAndReplica => newPartitionsToBeAdded.contains(partitionAndReplica._1))
+    val oldPartitionsToBeModified = existingPartitionsInContext.filter{ case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).diff(partitionReplicaAssignment(topicPartition).replicas).nonEmpty
+    }
+
     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
-      if (partitionsToBeAdded.nonEmpty) {
+      if (newPartitionsToBeAdded.nonEmpty) {
         warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          .format(newPartitionsToBeAdded.map(_._1.partition).mkString(","), topic))

Review comment:
       Could we use `case` to avoid unnamed reference `_._1`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org