You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/07 14:28:24 UTC
[kafka] branch 3.5 updated: KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.5 by this push:
new 9b9de0fc28e KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
9b9de0fc28e is described below
commit 9b9de0fc28e51b8f05c6c06014558f12f83a53a6
Author: andymg3 <97...@users.noreply.github.com>
AuthorDate: Fri Jul 7 10:16:51 2023 -0400
KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
Fixes a bug where we don't send UMR and LISR requests in dual-write mode when new partitions are created. Prior to this patch, KRaftMigrationZkWriter was mutating the internal data-structures of TopicDelta which prevented MigrationPropagator from sending UMR and LISR for the changed partitions.
Reviewers: David Arthur <mu...@gmail.com>
---
.../kafka/zk/ZkMigrationIntegrationTest.scala | 17 ++++++++++++++++-
.../metadata/migration/KRaftMigrationZkWriter.java | 4 ++--
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index fc5da56721f..662f706b095 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -495,8 +495,15 @@ class ZkMigrationIntegrationTest {
// Verify the changes made to KRaft are seen in ZK
verifyTopicPartitionMetadata(topicName, existingPartitions, zkClient)
+ val newPartitionCount = 3
log.info("Create new partitions with AdminClient")
- admin.createPartitions(Map(topicName -> NewPartitions.increaseTo(3)).asJava).all().get(60, TimeUnit.SECONDS)
+ admin.createPartitions(Map(topicName -> NewPartitions.increaseTo(newPartitionCount)).asJava).all().get(60, TimeUnit.SECONDS)
+ val (topicDescOpt, _) = TestUtils.computeUntilTrue(topicDesc(topicName, admin))(td => {
+ td.isDefined && td.get.partitions().asScala.size == newPartitionCount
+ })
+ assertTrue(topicDescOpt.isDefined)
+ val partitions = topicDescOpt.get.partitions().asScala
+ assertEquals(newPartitionCount, partitions.size)
// Verify the changes seen in Zk.
verifyTopicPartitionMetadata(topicName, existingPartitions ++ Seq(new TopicPartition(topicName, 2)), zkClient)
@@ -522,6 +529,14 @@ class ZkMigrationIntegrationTest {
}, "Unable to find topic partition metadata")
}
+ def topicDesc(topic: String, admin: Admin): Option[TopicDescription] = {
+ try {
+ admin.describeTopics(util.Collections.singleton(topic)).allTopicNames().get().asScala.get(topic)
+ } catch {
+ case _: Throwable => None
+ }
+ }
+
def allocateProducerId(bootstrapServers: String): Unit = {
val props = new Properties()
props.put("bootstrap.servers", bootstrapServers)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 046ab2fdc12..485beb763c5 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -291,8 +291,8 @@ public class KRaftMigrationZkWriter {
topicId,
topicsImage.getTopic(topicId).partitions(),
migrationState));
- Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
- Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
+ Map<Integer, PartitionRegistration> newPartitions = new HashMap<>(topicDelta.newPartitions());
+ Map<Integer, PartitionRegistration> changedPartitions = new HashMap<>(topicDelta.partitionChanges());
if (!newPartitions.isEmpty()) {
operationConsumer.accept(
UPDATE_PARTITON,