You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ahuang98 (via GitHub)" <gi...@apache.org> on 2023/06/06 07:53:25 UTC

[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

ahuang98 commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1218675188


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -267,6 +267,9 @@ void handleTopicsDelta(
     ) {
         topicsDelta.deletedTopicIds().forEach(topicId -> {
             String name = deletedTopicNameResolver.apply(topicId);
+            if (name == null) {

Review Comment:
   curious if we hit a bug which resulted in this check as well?



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));

Review Comment:
   nit: add comment, something like `// no-op` above



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -223,16 +223,16 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
             );
             ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
             operationConsumer.accept(
-                UPDATE_TOPIC_CONFIG,
-                "Updating Configs for Topic " + topicName + ", ID " + topicId,
+                DELETE_TOPIC_CONFIG,

Review Comment:
   did the new tests catch this bug?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -475,12 +478,13 @@ void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationCon
     void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
         Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
         updatedResources.forEach(configResource -> {
+            String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
             Map<String, String> props = configsImage.configMapForResource(configResource);
             if (props.isEmpty()) {
-                operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState ->
+                operationConsumer.accept(opType, "Delete configs for " + configResource, migrationState ->

Review Comment:
   do we need to set opType within the if else statements? 
   e.g. 
   ```
   if (props.isEmpty()) {
     String opType = brokerOrTopicOpType(configResource, DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG);
     operationConsumer.accept(opType, "Delete configs for "....)
   } else {
     String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
     operationConsumer.accept(opType, "Update configs for "....)
     
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org