You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/05/26 19:14:58 UTC

[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

cmccabe commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207225622


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
-        if (!topicBatch.isEmpty) {
-          recordConsumer.accept(topicBatch)
-          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-        }
+        override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+          if (!topicBatch.isEmpty) {
+            recordConsumer.accept(topicBatch)
+            topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+          }
 
-        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-          .setName(topicName)
-          .setTopicId(topicId), 0.toShort))
-      }
+          topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+            .setName(topicName)
+            .setTopicId(topicId), 0.toShort))
 
-      override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
-        val record = new PartitionRecord()
-          .setTopicId(topicIdPartition.topicId())
-          .setPartitionId(topicIdPartition.partition())
-          .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-          .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-          .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-          .setLeader(partitionRegistration.leader)
-          .setLeaderEpoch(partitionRegistration.leaderEpoch)
-          .setPartitionEpoch(partitionRegistration.partitionEpoch)
-          .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-        partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-      }
+          // This breaks the abstraction a bit, but the topic configs belong in the topic batch

Review Comment:
   It's not really required for the topic config records to come right after the topics. It would be OK to do it in a separate section as we do with snapshots.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org