You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/05/22 19:26:20 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…

mumrah commented on code in PR #13735:
URL: https://github.com/apache/kafka/pull/13735#discussion_r1200917708


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -128,23 +133,55 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
                         return; // topic deleted in KRaft
                     }
 
+                    // If there is failure in previous Zk writes, We could end up with Zookeeper
+                    // containing with partial or without any partitions for existing topics. So
+                    // accumulate the partition ids to check for any missing partitions in Zk.
+                    partitionsInZk
+                        .computeIfAbsent(topic.id(), __ -> new HashSet<>())
+                        .add(topicIdPartition.partition());
+
                     // Check if the KRaft partition state changed
                     PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
                     if (!kraftPartition.equals(partitionRegistration)) {
                         changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
                             .put(topicIdPartition.partition(), kraftPartition);
                     }
+
+                    // Check if partition assignment has changed. This will need topic update.
+                    if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
+                        changedTopics.add(topic.id());
+                    }
                 }
             });
 
-        createdTopics.forEach(topicId -> {
+        // Check for any partition changes in existing topics.
+        for (Uuid topicId : topicsInZk) {

Review Comment:
   nit: let's use `forEach` here to be consistent with the other loops



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -128,23 +133,55 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
                         return; // topic deleted in KRaft
                     }
 
+                    // If there is failure in previous Zk writes, We could end up with Zookeeper
+                    // containing with partial or without any partitions for existing topics. So
+                    // accumulate the partition ids to check for any missing partitions in Zk.
+                    partitionsInZk
+                        .computeIfAbsent(topic.id(), __ -> new HashSet<>())
+                        .add(topicIdPartition.partition());
+
                     // Check if the KRaft partition state changed
                     PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
                     if (!kraftPartition.equals(partitionRegistration)) {
                         changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
                             .put(topicIdPartition.partition(), kraftPartition);
                     }
+
+                    // Check if partition assignment has changed. This will need topic update.
+                    if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
+                        changedTopics.add(topic.id());
+                    }
                 }
             });
 
-        createdTopics.forEach(topicId -> {
+        // Check for any partition changes in existing topics.
+        for (Uuid topicId : topicsInZk) {
+            TopicImage topic = topicsImage.getTopic(topicId);
+            Set<Integer> topicPartitionsInZk = partitionsInZk.getOrDefault(topicId, new HashSet<>());

Review Comment:
   Can we use `computeIfAbsent` here? Otherwise we'll end up allocating a new HashSet on each iteration of the loop



##########
core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala:
##########
@@ -259,4 +259,60 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
     assertEquals("300000", configs.last.value())
   }
+
+  @Test
+  def testUpdateExistingTopicWithNewAndChangedPartitions(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val topicId = Uuid.randomUuid()
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Change assignment in partitions and update the topic assignment. See the change is
+    // reflected.
+    val changedPartitions = Map(
+      0 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().updateTopic("test", topicId, changedPartitions, migrationState)
+    assertEquals(2, migrationState.migrationZkVersion())
+
+    // Read the changed partition with zkClient.
+    val topicReplicaAssignmentFromZk = zkClient.getReplicaAssignmentAndTopicIdForTopics(Set("test"))
+    assertEquals(1, topicReplicaAssignmentFromZk.size)
+    assertEquals(Some(topicId), topicReplicaAssignmentFromZk.head.topicId);
+    topicReplicaAssignmentFromZk.head.assignment.foreach { case (tp, assignment) =>
+      tp.partition() match {
+        case p if p <=1 =>
+          assertEquals(changedPartitions.get(p).replicas.toSeq, assignment.replicas)
+          assertEquals(changedPartitions.get(p).addingReplicas.toSeq, assignment.addingReplicas)
+          assertEquals(changedPartitions.get(p).removingReplicas.toSeq, assignment.removingReplicas)
+        case p => fail(s"Found unknown partition $p")
+      }
+    }
+
+    // Add a new Partition.
+    val newPartition = Map(
+      2 -> new PartitionRegistration(Array(2, 3, 4), Array(2, 3, 4), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => int2Integer(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().createTopicPartitions(Map("test" -> newPartition).asJava, migrationState)
+    assertEquals(3, migrationState.migrationZkVersion())
+
+    // Read new partition from Zk.
+    val newPartitionFromZk = zkClient.getTopicPartitionState(new TopicPartition("test", 2))
+    assertTrue(newPartitionFromZk.isDefined)
+    newPartitionFromZk.foreach { part =>
+      val expectedPartition = newPartition.get(2)
+      assertEquals(expectedPartition.leader, part.leaderAndIsr.leader)
+      // Since KRaft increments partition epoch on change.
+      assertEquals(expectedPartition.partitionEpoch + 1, part.leaderAndIsr.partitionEpoch)
+      assertEquals(expectedPartition.leaderEpoch, part.leaderAndIsr.leaderEpoch)
+      assertEquals(expectedPartition.leaderRecoveryState, part.leaderAndIsr.leaderRecoveryState)
+      assertEquals(expectedPartition.isr.toList, part.leaderAndIsr.isr)
+    }

Review Comment:
   Can you add a test case for adding a new partition and changing existing partitions?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -184,11 +232,36 @@ void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDe
                         topicDelta.partitionChanges(),
                         migrationState));
             } else {
-                operationConsumer.accept(
-                    "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
-                    migrationState -> migrationClient.topicClient().updateTopicPartitions(
-                        Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
-                        migrationState));
+                if (topicDelta.hasPartitionsWithAssignmentChanges())
+                    operationConsumer.accept(
+                        "Updating Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().updateTopic(
+                            topicDelta.name(),
+                            topicId,
+                            topicsImage.getTopic(topicId).partitions(),
+                            migrationState));
+                Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
+                Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
+                if (!newPartitions.isEmpty()) {
+                    operationConsumer.accept(
+                        "Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().createTopicPartitions(
+                            Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
+                            migrationState));
+                    changedPartitions = changedPartitions

Review Comment:
   Instead of reassigning, could we just remove the newPartitions from changedPartitions?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -128,23 +133,55 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
                         return; // topic deleted in KRaft
                     }
 
+                    // If there is failure in previous Zk writes, We could end up with Zookeeper
+                    // containing with partial or without any partitions for existing topics. So
+                    // accumulate the partition ids to check for any missing partitions in Zk.
+                    partitionsInZk
+                        .computeIfAbsent(topic.id(), __ -> new HashSet<>())
+                        .add(topicIdPartition.partition());
+
                     // Check if the KRaft partition state changed
                     PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
                     if (!kraftPartition.equals(partitionRegistration)) {
                         changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
                             .put(topicIdPartition.partition(), kraftPartition);
                     }
+
+                    // Check if partition assignment has changed. This will need topic update.
+                    if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
+                        changedTopics.add(topic.id());
+                    }
                 }
             });
 
-        createdTopics.forEach(topicId -> {
+        // Check for any partition changes in existing topics.
+        for (Uuid topicId : topicsInZk) {
+            TopicImage topic = topicsImage.getTopic(topicId);
+            Set<Integer> topicPartitionsInZk = partitionsInZk.getOrDefault(topicId, new HashSet<>());
+            if (!topicPartitionsInZk.equals(topic.partitions().keySet())) {
+                Map<Integer, PartitionRegistration> newTopicPartitions = new HashMap<>(topic.partitions());
+                topicPartitionsInZk.forEach(newTopicPartitions::remove);
+                newPartitions.put(topicId, newTopicPartitions);
+                changedTopics.add(topicId);

Review Comment:
   I think it's possible that we can see a difference between `topicPartitionsInZk` (computed based on the partition znodes) and the topic assignment in Kraft, but actually have a correct assignment in ZK (stored in the topic znode). That said, I don't think there's any harm in re-writing the kraft assignment to ZK.



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -184,11 +232,36 @@ void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDe
                         topicDelta.partitionChanges(),
                         migrationState));
             } else {
-                operationConsumer.accept(
-                    "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
-                    migrationState -> migrationClient.topicClient().updateTopicPartitions(
-                        Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
-                        migrationState));
+                if (topicDelta.hasPartitionsWithAssignmentChanges())
+                    operationConsumer.accept(
+                        "Updating Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().updateTopic(
+                            topicDelta.name(),
+                            topicId,
+                            topicsImage.getTopic(topicId).partitions(),
+                            migrationState));
+                Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
+                Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
+                if (!newPartitions.isEmpty()) {
+                    operationConsumer.accept(
+                        "Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().createTopicPartitions(
+                            Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),

Review Comment:
   Should this be passing `newPartitions` to `createTopicPartitions`?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -184,11 +232,36 @@ void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDe
                         topicDelta.partitionChanges(),
                         migrationState));
             } else {
-                operationConsumer.accept(
-                    "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
-                    migrationState -> migrationClient.topicClient().updateTopicPartitions(
-                        Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
-                        migrationState));
+                if (topicDelta.hasPartitionsWithAssignmentChanges())
+                    operationConsumer.accept(
+                        "Updating Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().updateTopic(
+                            topicDelta.name(),
+                            topicId,
+                            topicsImage.getTopic(topicId).partitions(),
+                            migrationState));
+                Map<Integer, PartitionRegistration> newPartitions = topicDelta.newPartitions();
+                Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges();
+                if (!newPartitions.isEmpty()) {
+                    operationConsumer.accept(
+                        "Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId,
+                        migrationState -> migrationClient.topicClient().createTopicPartitions(
+                            Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
+                            migrationState));
+                    changedPartitions = changedPartitions
+                        .entrySet()
+                        .stream()
+                        .filter(entry -> !newPartitions.containsKey(entry.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+                }
+                if (!changedPartitions.isEmpty()) {
+                    Map<Integer, PartitionRegistration> finalChangedPartitions = changedPartitions;

Review Comment:
   Do we need this variable?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -128,23 +133,55 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
                         return; // topic deleted in KRaft
                     }
 
+                    // If there is failure in previous Zk writes, We could end up with Zookeeper
+                    // containing with partial or without any partitions for existing topics. So
+                    // accumulate the partition ids to check for any missing partitions in Zk.
+                    partitionsInZk

Review Comment:
   Does this also cover the base case of new partitions in a snapshot?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org