You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/07 14:28:24 UTC

[kafka] branch 3.5 updated: KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new 9b9de0fc28e KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
9b9de0fc28e is described below

commit 9b9de0fc28e51b8f05c6c06014558f12f83a53a6
Author: andymg3 <97...@users.noreply.github.com>
AuthorDate: Fri Jul 7 10:16:51 2023 -0400

    KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
    
    Fixes a bug where we don't send UMR and LISR requests in dual-write mode when new partitions are created. Prior to this patch, KRaftMigrationZkWriter was mutating the internal data-structures of TopicDelta which prevented MigrationPropagator from sending UMR and LISR for the changed partitions.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/zk/ZkMigrationIntegrationTest.scala           | 17 ++++++++++++++++-
 .../metadata/migration/KRaftMigrationZkWriter.java      |  4 ++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index fc5da56721f..662f706b095 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -495,8 +495,15 @@ class ZkMigrationIntegrationTest {
       // Verify the changes made to KRaft are seen in ZK
       verifyTopicPartitionMetadata(topicName, existingPartitions, zkClient)
 
+      val newPartitionCount = 3
       log.info("Create new partitions with AdminClient")
-      admin.createPartitions(Map(topicName -> NewPartitions.increaseTo(3)).asJava).all().get(60, TimeUnit.SECONDS)
+      admin.createPartitions(Map(topicName -> NewPartitions.increaseTo(newPartitionCount)).asJava).all().get(60, TimeUnit.SECONDS)
+      val (topicDescOpt, _) = TestUtils.computeUntilTrue(topicDesc(topicName, admin))(td => {
+        td.isDefined && td.get.partitions().asScala.size == newPartitionCount
+      })
+      assertTrue(topicDescOpt.isDefined)
+      val partitions = topicDescOpt.get.partitions().asScala
+      assertEquals(newPartitionCount, partitions.size)
 
       // Verify the changes seen in Zk.
       verifyTopicPartitionMetadata(topicName, existingPartitions ++ Seq(new TopicPartition(topicName, 2)), zkClient)
@@ -522,6 +529,14 @@ class ZkMigrationIntegrationTest {
     }, "Unable to find topic partition metadata")
   }
 
+  def topicDesc(topic: String, admin: Admin): Option[TopicDescription] = {
+    try {
+      admin.describeTopics(util.Collections.singleton(topic)).allTopicNames().get().asScala.get(topic)
+    } catch {
+      case _: Throwable => None
+    }
+  }
+
   def allocateProducerId(bootstrapServers: String): Unit = {
     val props = new Properties()
     props.put("bootstrap.servers", bootstrapServers)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 046ab2fdc12..485beb763c5 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -291,8 +291,8 @@ public class KRaftMigrationZkWriter {
                             topicId,
                             topicsImage.getTopic(topicId).partitions(),
                             migrationState));
-                Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
-                Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
+                Map<Integer, PartitionRegistration> newPartitions = new HashMap<>(topicDelta.newPartitions());
+                Map<Integer, PartitionRegistration> changedPartitions = new HashMap<>(topicDelta.partitionChanges());
                 if (!newPartitions.isEmpty()) {
                     operationConsumer.accept(
                         UPDATE_PARTITON,