You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/05/26 17:11:25 UTC

[GitHub] [kafka] mumrah opened a new pull request, #13767: KAFKA-15004: Fix configuration dual-write during migration

mumrah opened a new pull request, #13767:
URL: https://github.com/apache/kafka/pull/13767

   This PR builds on top of #13736.
   
   Fixes the following:
   
   * Topic configs are not sycned while handling snapshot.
   * New broker/topic configs in KRaft that did not exist in ZK will not be sync'd to ZK.
   * The sensitive configs are not encoded while writing them to Zookeeper.
   * Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot
   
   Added tests to ensure we no longer have the above mentioned issues.
   
   Co-Authored-By: Akhilesh C <ak...@users.noreply.github.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
-        if (!topicBatch.isEmpty) {
-          recordConsumer.accept(topicBatch)
-          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-        }
+        override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+          if (!topicBatch.isEmpty) {
+            recordConsumer.accept(topicBatch)
+            topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+          }
 
-        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-          .setName(topicName)
-          .setTopicId(topicId), 0.toShort))
-      }
+          topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+            .setName(topicName)
+            .setTopicId(topicId), 0.toShort))
 
-      override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
-        val record = new PartitionRecord()
-          .setTopicId(topicIdPartition.topicId())
-          .setPartitionId(topicIdPartition.partition())
-          .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-          .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-          .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-          .setLeader(partitionRegistration.leader)
-          .setLeaderEpoch(partitionRegistration.leaderEpoch)
-          .setPartitionEpoch(partitionRegistration.partitionEpoch)
-          .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-        partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-      }
+          // This breaks the abstraction a bit, but the topic configs belong in the topic batch

Review Comment:
   I was considering the fact that we don't atomically apply the migration records during the migration. I think it's possible for the controller or broker to publish the migration metadata before it's all committed. In this case, I think it's probably safer to include the config records with the topic batch.
   
   This won't be an issue once we implement KIP-868.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207234024


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -194,23 +195,44 @@ void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDe
     }
 
     void handleConfigsSnapshot(ConfigurationsImage configsImage) {
-        Set<ConfigResource> brokersToUpdate = new HashSet<>();
+        Set<ConfigResource> newResources = new HashSet<>(configsImage.resourceData().keySet())

Review Comment:
   I think we should throw an exception if we get a ConfigResource whose type isn't BROKER or TOPIC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207096649


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {

Review Comment:
   Just fixing the indentation here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah merged PR #13767:
URL: https://github.com/apache/kafka/pull/13767


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13767:
URL: https://github.com/apache/kafka/pull/13767#issuecomment-1565686986

   Got a reasonable test run:
   
   ```
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Isolated, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13767/6/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandTest/Build___JDK_17_and_Scala_2_13____6__Type_Raft_Isolated__Name_testDescribeQuorumReplicationSuccessful__MetadataVersion_3_5_IV2__Security_PLAINTEXT/)
   [Build / JDK 8 and Scala 2.12 / kafka.admin.DescribeConsumerGroupTest.testDescribeStateOfExistingGroupWithRoundRobinAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13767/6/testReport/junit/kafka.admin/DescribeConsumerGroupTest/Build___JDK_8_and_Scala_2_12___testDescribeStateOfExistingGroupWithRoundRobinAssignor__/)
   [Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13767/6/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testRackAwareRangeAssignor__/)
   [Build / JDK 11 and Scala 2.13 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13767/6/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13767/6/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13767: KAFKA-15004: Fix configuration dual-write during migration

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207225622


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
-        if (!topicBatch.isEmpty) {
-          recordConsumer.accept(topicBatch)
-          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-        }
+        override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+          if (!topicBatch.isEmpty) {
+            recordConsumer.accept(topicBatch)
+            topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+          }
 
-        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-          .setName(topicName)
-          .setTopicId(topicId), 0.toShort))
-      }
+          topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+            .setName(topicName)
+            .setTopicId(topicId), 0.toShort))
 
-      override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
-        val record = new PartitionRecord()
-          .setTopicId(topicIdPartition.topicId())
-          .setPartitionId(topicIdPartition.partition())
-          .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-          .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-          .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-          .setLeader(partitionRegistration.leader)
-          .setLeaderEpoch(partitionRegistration.leaderEpoch)
-          .setPartitionEpoch(partitionRegistration.partitionEpoch)
-          .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-        partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-      }
+          // This breaks the abstraction a bit, but the topic configs belong in the topic batch

Review Comment:
   It's not really required for the topic config records to come right after the topics. It would be OK to do it in a separate section as we do with snapshots.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org